Coverage for slidge/core/mixins/message.py: 82%

100 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import logging 

2import uuid 

3import warnings 

4from typing import TYPE_CHECKING, Optional 

5 

6from slixmpp import Iq, Message 

7from slixmpp.plugins.xep_0004 import Form 

8 

9from ...util.types import ChatState, LegacyMessageType, Marker 

10from .attachment import AttachmentMixin 

11from .message_maker import MessageMaker 

12from .message_text import TextMessageMixin 

13 

14if TYPE_CHECKING: 

15 from ...group import LegacyMUC 

16 

17# this is for MDS 

18PUBLISH_OPTIONS = Form() 

19PUBLISH_OPTIONS["type"] = "submit" 

20PUBLISH_OPTIONS.add_field( 

21 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options" 

22) 

23PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true") 

24PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max") 

25PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never") 

26PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist") 

27 

28 

29class ChatStateMixin(MessageMaker): 

30 def __init__(self) -> None: 

31 super().__init__() 

32 self.__last_chat_state: Optional[ChatState] = None 

33 

34 def _chat_state(self, state: ChatState, forced: bool = False, **kwargs) -> None: 

35 carbon = kwargs.get("carbon", False) 

36 if carbon or (state == self.__last_chat_state and not forced): 

37 return 

38 self.__last_chat_state = state 

39 msg = self._make_message(state=state, hints={"no-store"}) 

40 self._send(msg, **kwargs) 

41 

42 def active(self, **kwargs) -> None: 

43 """ 

44 Send an "active" chat state (:xep:`0085`) from this 

45 :term:`XMPP Entity`. 

46 """ 

47 self._chat_state("active", **kwargs) 

48 

49 def composing(self, **kwargs) -> None: 

50 """ 

51 Send a "composing" (ie "typing notification") chat state (:xep:`0085`) 

52 from this :term:`XMPP Entity`. 

53 """ 

54 self._chat_state("composing", forced=True, **kwargs) 

55 

56 def paused(self, **kwargs) -> None: 

57 """ 

58 Send a "paused" (ie "typing paused notification") chat state 

59 (:xep:`0085`) from this :term:`XMPP Entity`. 

60 """ 

61 self._chat_state("paused", **kwargs) 

62 

63 def inactive(self, **kwargs) -> None: 

64 """ 

65 Send an "inactive" (ie "contact has not interacted with the chat session 

66 interface for an intermediate period of time") chat state (:xep:`0085`) 

67 from this :term:`XMPP Entity`. 

68 """ 

69 self._chat_state("inactive", **kwargs) 

70 

71 def gone(self, **kwargs) -> None: 

72 """ 

73 Send a "gone" (ie "contact has not interacted with the chat session interface, 

74 system, or device for a relatively long period of time") chat state 

75 (:xep:`0085`) from this :term:`XMPP Entity`. 

76 """ 

77 self._chat_state("gone", **kwargs) 

78 

79 

80class MarkerMixin(MessageMaker): 

81 is_group: bool = NotImplemented 

82 

83 def _make_marker( 

84 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon: bool = False 

85 ): 

86 msg = self._make_message(carbon=carbon) 

87 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id) 

88 return msg 

89 

90 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None: 

91 """ 

92 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

93 

94 :param legacy_msg_id: The message this marker refers to 

95 """ 

96 self._send( 

97 self._make_marker( 

98 legacy_msg_id, "acknowledged", carbon=bool(kwargs.get("carbon")) 

99 ), 

100 **kwargs, 

101 ) 

102 

103 def received(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None: 

104 """ 

105 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

106 If called on a :class:`LegacyContact`, also send a delivery receipt 

107 marker (:xep:`0184`). 

108 

109 :param legacy_msg_id: The message this marker refers to 

110 """ 

111 carbon = bool(kwargs.get("carbon")) 

112 if self.mtype == "chat": 

113 self._send( 

114 self.xmpp.delivery_receipt.make_ack( 

115 self._legacy_to_xmpp(legacy_msg_id), 

116 mfrom=self.jid, 

117 mto=self.user_jid, 

118 ) 

119 ) 

120 self._send( 

121 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs 

122 ) 

123 

124 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None: 

125 """ 

126 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

127 

128 :param legacy_msg_id: The message this marker refers to 

129 """ 

130 self._send( 

131 self._make_marker( 

132 legacy_msg_id, "displayed", carbon=bool(kwargs.get("carbon")) 

133 ), 

134 **kwargs, 

135 ) 

136 if getattr(self, "is_user", False): 

137 self.session.create_task(self.__send_mds(legacy_msg_id)) 

138 

139 async def __send_mds(self, legacy_msg_id: LegacyMessageType) -> None: 

140 # Send a MDS displayed marker on behalf of the user for a group chat 

141 if muc := getattr(self, "muc", None): 

142 muc_jid = muc.jid.bare 

143 else: 

144 # This is not implemented for 1:1 chat because it would rely on 

145 # storing the XMPP-server injected stanza-id, which we don't track 

146 # ATM. 

147 # In practice, MDS should mostly be useful for public group chats, 

148 # so it should not be an issue. 

149 # We'll see if we need to implement that later 

150 return 

151 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id) 

152 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set") 

153 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS 

154 iq["pubsub"]["publish"]["item"]["id"] = muc_jid 

155 displayed = self.xmpp["xep_0490"].stanza.Displayed() 

156 displayed["stanza_id"]["id"] = xmpp_msg_id 

157 displayed["stanza_id"]["by"] = muc_jid 

158 iq["pubsub"]["publish"]["item"]["payload"] = displayed 

159 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS 

160 try: 

161 await self.xmpp["xep_0356"].send_privileged_iq(iq) 

162 except Exception as e: 

163 self.session.log.debug("Could not MDS mark", exc_info=e) 

164 

165 

166class ContentMessageMixin(AttachmentMixin, TextMessageMixin): 

167 pass 

168 

169 

170class CarbonMessageMixin(ContentMessageMixin, MarkerMixin): 

171 def _privileged_send(self, msg: Message) -> None: 

172 i = msg.get_id() 

173 if i: 

174 self.session.ignore_messages.add(i) 

175 else: 

176 i = "slidge-carbon-" + str(uuid.uuid4()) 

177 msg.set_id(i) 

178 msg.del_origin_id() 

179 try: 

180 self.xmpp["xep_0356"].send_privileged_message(msg) 

181 except PermissionError: 

182 warnings.warn( 

183 "Slidge does not have privileges to send message on behalf of" 

184 " user.Refer to" 

185 " https://slidge.im/docs/slidge/main/admin/privilege.html" 

186 " for more info." 

187 ) 

188 

189 

190class InviteMixin(MessageMaker): 

191 def invite_to( 

192 self, 

193 muc: "LegacyMUC", 

194 reason: Optional[str] = None, 

195 password: Optional[str] = None, 

196 **send_kwargs, 

197 ) -> None: 

198 """ 

199 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`. 

200 

201 :param muc: the muc the user is invited to 

202 :param reason: a text explaining why the user should join this muc 

203 :param password: maybe this will make sense later? not sure 

204 :param send_kwargs: additional kwargs to be passed to _send() 

205 (internal use by slidge) 

206 """ 

207 msg = self._make_message(mtype="normal") 

208 msg["groupchat_invite"]["jid"] = muc.jid 

209 if reason: 

210 msg["groupchat_invite"]["reason"] = reason 

211 if password: 

212 msg["groupchat_invite"]["password"] = password 

213 self._send(msg, **send_kwargs) 

214 

215 

216class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin): 

217 pass 

218 

219 

220class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin): 

221 pass 

222 

223 

224log = logging.getLogger(__name__)