Coverage for slidge/core/mixins/message.py: 83%

103 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-10 09:11 +0000

1import logging 

2import uuid 

3import warnings 

4from typing import TYPE_CHECKING, Optional 

5 

6from slixmpp import Iq, Message 

7from slixmpp.plugins.xep_0004 import Form 

8 

9from ...util.types import ChatState, LegacyMessageType, Marker 

10from .attachment import AttachmentMixin 

11from .message_maker import MessageMaker 

12from .message_text import TextMessageMixin 

13 

14if TYPE_CHECKING: 

15 from ...group import LegacyMUC 

16 

17# this is for MDS 

18PUBLISH_OPTIONS = Form() 

19PUBLISH_OPTIONS["type"] = "submit" 

20PUBLISH_OPTIONS.add_field( 

21 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options" 

22) 

23PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true") 

24PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max") 

25PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never") 

26PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist") 

27 

28 

29class ChatStateMixin(MessageMaker): 

30 def __init__(self): 

31 super().__init__() 

32 self.__last_chat_state: Optional[ChatState] = None 

33 

34 def _chat_state(self, state: ChatState, forced=False, **kwargs): 

35 carbon = kwargs.get("carbon", False) 

36 if carbon or (state == self.__last_chat_state and not forced): 

37 return 

38 self.__last_chat_state = state 

39 msg = self._make_message(state=state, hints={"no-store"}) 

40 self._send(msg, **kwargs) 

41 

42 def active(self, **kwargs): 

43 """ 

44 Send an "active" chat state (:xep:`0085`) from this 

45 :term:`XMPP Entity`. 

46 """ 

47 self._chat_state("active", **kwargs) 

48 

49 def composing(self, **kwargs): 

50 """ 

51 Send a "composing" (ie "typing notification") chat state (:xep:`0085`) 

52 from this :term:`XMPP Entity`. 

53 """ 

54 self._chat_state("composing", forced=True, **kwargs) 

55 

56 def paused(self, **kwargs): 

57 """ 

58 Send a "paused" (ie "typing paused notification") chat state 

59 (:xep:`0085`) from this :term:`XMPP Entity`. 

60 """ 

61 self._chat_state("paused", **kwargs) 

62 

63 def inactive(self, **kwargs): 

64 """ 

65 Send an "inactive" (ie "contact has not interacted with the chat session 

66 interface for an intermediate period of time") chat state (:xep:`0085`) 

67 from this :term:`XMPP Entity`. 

68 """ 

69 self._chat_state("inactive", **kwargs) 

70 

71 def gone(self, **kwargs): 

72 """ 

73 Send a "gone" (ie "contact has not interacted with the chat session interface, 

74 system, or device for a relatively long period of time") chat state 

75 (:xep:`0085`) from this :term:`XMPP Entity`. 

76 """ 

77 self._chat_state("gone", **kwargs) 

78 

79 

80class MarkerMixin(MessageMaker): 

81 is_group: bool = NotImplemented 

82 

83 def _make_marker( 

84 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon=False 

85 ): 

86 msg = self._make_message(carbon=carbon) 

87 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id) 

88 return msg 

89 

90 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs): 

91 """ 

92 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

93 

94 :param legacy_msg_id: The message this marker refers to 

95 """ 

96 self._send( 

97 self._make_marker( 

98 legacy_msg_id, "acknowledged", carbon=kwargs.get("carbon") 

99 ), 

100 **kwargs, 

101 ) 

102 

103 def received(self, legacy_msg_id: LegacyMessageType, **kwargs): 

104 """ 

105 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

106 If called on a :class:`LegacyContact`, also send a delivery receipt 

107 marker (:xep:`0184`). 

108 

109 :param legacy_msg_id: The message this marker refers to 

110 """ 

111 carbon = kwargs.get("carbon") 

112 if self.mtype == "chat": 

113 self._send( 

114 self.xmpp.delivery_receipt.make_ack( 

115 self._legacy_to_xmpp(legacy_msg_id), 

116 mfrom=self.jid, 

117 mto=self.user_jid, 

118 ) 

119 ) 

120 self._send( 

121 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs 

122 ) 

123 

124 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs): 

125 """ 

126 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

127 

128 :param legacy_msg_id: The message this marker refers to 

129 """ 

130 self._send( 

131 self._make_marker(legacy_msg_id, "displayed", carbon=kwargs.get("carbon")), 

132 **kwargs, 

133 ) 

134 if getattr(self, "is_user", False): 

135 self.session.create_task(self.__send_mds(legacy_msg_id)) 

136 

137 async def __send_mds(self, legacy_msg_id: LegacyMessageType): 

138 # Send a MDS displayed marker on behalf of the user for a group chat 

139 if muc := getattr(self, "muc", None): 

140 muc_jid = muc.jid.bare 

141 else: 

142 # This is not implemented for 1:1 chat because it would rely on 

143 # storing the XMPP-server injected stanza-id, which we don't track 

144 # ATM. 

145 # In practice, MDS should mostly be useful for public group chats, 

146 # so it should not be an issue. 

147 # We'll see if we need to implement that later 

148 return 

149 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id) 

150 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set") 

151 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS 

152 iq["pubsub"]["publish"]["item"]["id"] = muc_jid 

153 displayed = self.xmpp["xep_0490"].stanza.Displayed() 

154 displayed["stanza_id"]["id"] = xmpp_msg_id 

155 displayed["stanza_id"]["by"] = muc_jid 

156 iq["pubsub"]["publish"]["item"]["payload"] = displayed 

157 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS 

158 try: 

159 await self.xmpp["xep_0356"].send_privileged_iq(iq) 

160 except Exception as e: 

161 self.session.log.debug("Could not MDS mark", exc_info=e) 

162 

163 

164class ContentMessageMixin(AttachmentMixin, TextMessageMixin): 

165 pass 

166 

167 

168class CarbonMessageMixin(ContentMessageMixin, MarkerMixin): 

169 def _privileged_send(self, msg: Message): 

170 i = msg.get_id() 

171 if i: 

172 self.session.ignore_messages.add(i) 

173 else: 

174 i = "slidge-carbon-" + str(uuid.uuid4()) 

175 msg.set_id(i) 

176 msg.del_origin_id() 

177 try: 

178 self.xmpp["xep_0356"].send_privileged_message(msg) 

179 except PermissionError: 

180 try: 

181 self.xmpp["xep_0356_old"].send_privileged_message(msg) 

182 except PermissionError: 

183 warnings.warn( 

184 "Slidge does not have privileges to send message on behalf of" 

185 " user.Refer to" 

186 " https://slidge.codeberg.page/docs/main/admin/privilege.html" 

187 " for more info." 

188 ) 

189 

190 

191class InviteMixin(MessageMaker): 

192 def invite_to( 

193 self, 

194 muc: "LegacyMUC", 

195 reason: Optional[str] = None, 

196 password: Optional[str] = None, 

197 **send_kwargs, 

198 ): 

199 """ 

200 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`. 

201 

202 :param muc: the muc the user is invited to 

203 :param reason: a text explaining why the user should join this muc 

204 :param password: maybe this will make sense later? not sure 

205 :param send_kwargs: additional kwargs to be passed to _send() 

206 (internal use by slidge) 

207 """ 

208 msg = self._make_message(mtype="normal") 

209 msg["groupchat_invite"]["jid"] = muc.jid 

210 if reason: 

211 msg["groupchat_invite"]["reason"] = reason 

212 if password: 

213 msg["groupchat_invite"]["password"] = password 

214 self._send(msg, **send_kwargs) 

215 

216 

217class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin): 

218 pass 

219 

220 

221class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin): 

222 pass 

223 

224 

225log = logging.getLogger(__name__)