Coverage for slidge/core/mixins/message.py: 84%

100 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import logging 

2import uuid 

3import warnings 

4from typing import TYPE_CHECKING, Optional 

5 

6from slixmpp import Iq, Message 

7from slixmpp.plugins.xep_0004 import Form 

8 

9from ...util.types import ChatState, LegacyMessageType, Marker 

10from .attachment import AttachmentMixin 

11from .message_maker import MessageMaker 

12from .message_text import TextMessageMixin 

13 

14if TYPE_CHECKING: 

15 from ...group import LegacyMUC 

16 

17# this is for MDS 

18PUBLISH_OPTIONS = Form() 

19PUBLISH_OPTIONS["type"] = "submit" 

20PUBLISH_OPTIONS.add_field( 

21 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options" 

22) 

23PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true") 

24PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max") 

25PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never") 

26PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist") 

27 

28 

29class ChatStateMixin(MessageMaker): 

30 def __init__(self) -> None: 

31 super().__init__() 

32 self.__last_chat_state: Optional[ChatState] = None 

33 

34 def _chat_state(self, state: ChatState, forced: bool = False, **kwargs) -> None: 

35 carbon = kwargs.get("carbon", False) 

36 if carbon or (state == self.__last_chat_state and not forced): 

37 return 

38 self.__last_chat_state = state 

39 msg = self._make_message(state=state, hints={"no-store"}) 

40 self._send(msg, **kwargs) 

41 

42 def active(self, **kwargs) -> None: 

43 """ 

44 Send an "active" chat state (:xep:`0085`) from this 

45 :term:`XMPP Entity`. 

46 """ 

47 self._chat_state("active", **kwargs) 

48 

49 def composing(self, **kwargs) -> None: 

50 """ 

51 Send a "composing" (ie "typing notification") chat state (:xep:`0085`) 

52 from this :term:`XMPP Entity`. 

53 """ 

54 self._chat_state("composing", forced=True, **kwargs) 

55 

56 def paused(self, **kwargs) -> None: 

57 """ 

58 Send a "paused" (ie "typing paused notification") chat state 

59 (:xep:`0085`) from this :term:`XMPP Entity`. 

60 """ 

61 self._chat_state("paused", **kwargs) 

62 

63 def inactive(self, **kwargs) -> None: 

64 """ 

65 Send an "inactive" (ie "contact has not interacted with the chat session 

66 interface for an intermediate period of time") chat state (:xep:`0085`) 

67 from this :term:`XMPP Entity`. 

68 """ 

69 self._chat_state("inactive", **kwargs) 

70 

71 def gone(self, **kwargs) -> None: 

72 """ 

73 Send a "gone" (ie "contact has not interacted with the chat session interface, 

74 system, or device for a relatively long period of time") chat state 

75 (:xep:`0085`) from this :term:`XMPP Entity`. 

76 """ 

77 self._chat_state("gone", **kwargs) 

78 

79 

80class MarkerMixin(MessageMaker): 

81 def _make_marker( 

82 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon: bool = False 

83 ): 

84 msg = self._make_message(carbon=carbon) 

85 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id)[-1] 

86 return msg 

87 

88 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None: 

89 """ 

90 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

91 

92 :param legacy_msg_id: The message this marker refers to 

93 """ 

94 self._send( 

95 self._make_marker( 

96 legacy_msg_id, "acknowledged", carbon=bool(kwargs.get("carbon")) 

97 ), 

98 **kwargs, 

99 ) 

100 

101 def received(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None: 

102 """ 

103 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

104 If called on a :class:`LegacyContact`, also send a delivery receipt 

105 marker (:xep:`0184`). 

106 

107 :param legacy_msg_id: The message this marker refers to 

108 """ 

109 carbon = bool(kwargs.get("carbon")) 

110 if self.mtype == "chat": 

111 for msg_id in self._legacy_to_xmpp(legacy_msg_id): 

112 self._send( 

113 self.xmpp.delivery_receipt.make_ack( 

114 msg_id, 

115 mfrom=self.jid, 

116 mto=self.user_jid, 

117 ) 

118 ) 

119 self._send( 

120 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs 

121 ) 

122 

123 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None: 

124 """ 

125 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

126 

127 :param legacy_msg_id: The message this marker refers to 

128 """ 

129 self._send( 

130 self._make_marker( 

131 legacy_msg_id, "displayed", carbon=bool(kwargs.get("carbon")) 

132 ), 

133 **kwargs, 

134 ) 

135 if getattr(self, "is_user", False): 

136 self.session.create_task(self.__send_mds(legacy_msg_id)) 

137 

138 async def __send_mds(self, legacy_msg_id: LegacyMessageType) -> None: 

139 # Send a MDS displayed marker on behalf of the user for a group chat 

140 if muc := getattr(self, "muc", None): 

141 muc_jid = muc.jid.bare 

142 else: 

143 # This is not implemented for 1:1 chat because it would rely on 

144 # storing the XMPP-server injected stanza-id, which we don't track 

145 # ATM. 

146 # In practice, MDS should mostly be useful for public group chats, 

147 # so it should not be an issue. 

148 # We'll see if we need to implement that later 

149 return 

150 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id)[-1] 

151 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set") 

152 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS 

153 iq["pubsub"]["publish"]["item"]["id"] = muc_jid 

154 displayed = self.xmpp["xep_0490"].stanza.Displayed() 

155 displayed["stanza_id"]["id"] = xmpp_msg_id 

156 displayed["stanza_id"]["by"] = muc_jid 

157 iq["pubsub"]["publish"]["item"]["payload"] = displayed 

158 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS 

159 try: 

160 await self.xmpp["xep_0356"].send_privileged_iq(iq) 

161 except Exception as e: 

162 self.session.log.debug("Could not MDS mark", exc_info=e) 

163 

164 

165class ContentMessageMixin(AttachmentMixin, TextMessageMixin): 

166 pass 

167 

168 

169class CarbonMessageMixin(ContentMessageMixin, MarkerMixin): 

170 def _privileged_send(self, msg: Message) -> None: 

171 i = msg.get_id() 

172 if i: 

173 self.session.ignore_messages.add(i) 

174 else: 

175 i = "slidge-carbon-" + str(uuid.uuid4()) 

176 msg.set_id(i) 

177 msg.del_origin_id() 

178 try: 

179 self.xmpp["xep_0356"].send_privileged_message(msg) 

180 except PermissionError: 

181 warnings.warn( 

182 "Slidge does not have the privilege (XEP-0356) to send messages on behalf of users. " 

183 "Consider configuring your XMPP server for that." 

184 ) 

185 

186 

187class InviteMixin(MessageMaker): 

188 def invite_to( 

189 self, 

190 muc: "LegacyMUC", 

191 reason: Optional[str] = None, 

192 password: Optional[str] = None, 

193 **send_kwargs, 

194 ) -> None: 

195 """ 

196 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`. 

197 

198 :param muc: the muc the user is invited to 

199 :param reason: a text explaining why the user should join this muc 

200 :param password: maybe this will make sense later? not sure 

201 :param send_kwargs: additional kwargs to be passed to _send() 

202 (internal use by slidge) 

203 """ 

204 msg = self._make_message(mtype="normal") 

205 msg["groupchat_invite"]["jid"] = muc.jid 

206 if reason: 

207 msg["groupchat_invite"]["reason"] = reason 

208 if password: 

209 msg["groupchat_invite"]["password"] = password 

210 self._send(msg, **send_kwargs) 

211 

212 

213class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin): 

214 pass 

215 

216 

217class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin): 

218 pass 

219 

220 

221log = logging.getLogger(__name__)