Coverage for slidge / core / mixins / message.py: 87%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import logging 

2import uuid 

3import warnings 

4from typing import TYPE_CHECKING 

5 

6from slixmpp import Iq, Message 

7from slixmpp.plugins.xep_0004.stanza.form import Form 

8 

9from ...util.types import AnyMUC, ChatState, LegacyMessageType, Marker 

10from .attachment import AttachmentMixin 

11from .message_maker import MessageMaker 

12from .message_text import TextMessageMixin 

13 

14if TYPE_CHECKING: 

15 pass 

16 

17# this is for MDS 

18PUBLISH_OPTIONS = Form() 

19PUBLISH_OPTIONS["type"] = "submit" 

20PUBLISH_OPTIONS.add_field( 

21 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options" 

22) 

23PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true") 

24PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max") 

25PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never") 

26PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist") 

27 

28 

29class ChatStateMixin(MessageMaker): 

30 def __init__(self) -> None: 

31 super().__init__() 

32 self.__last_chat_state: ChatState | None = None 

33 

34 def _chat_state( 

35 self, state: ChatState, forced: bool = False, **kwargs: object 

36 ) -> None: 

37 carbon = kwargs.get("carbon", False) 

38 if carbon or (state == self.__last_chat_state and not forced): 

39 return 

40 self.__last_chat_state = state 

41 msg = self._make_message(state=state, hints={"no-store"}) 

42 self._send(msg, **kwargs) 

43 

44 def active(self, **kwargs: object) -> None: 

45 """ 

46 Send an "active" chat state (:xep:`0085`) from this 

47 :term:`XMPP Entity`. 

48 """ 

49 self._chat_state("active", forced=False, **kwargs) 

50 

51 def composing(self, **kwargs: object) -> None: 

52 """ 

53 Send a "composing" (ie "typing notification") chat state (:xep:`0085`) 

54 from this :term:`XMPP Entity`. 

55 """ 

56 self._chat_state("composing", forced=True, **kwargs) 

57 

58 def paused(self, **kwargs: object) -> None: 

59 """ 

60 Send a "paused" (ie "typing paused notification") chat state 

61 (:xep:`0085`) from this :term:`XMPP Entity`. 

62 """ 

63 self._chat_state("paused", forced=False, **kwargs) 

64 

65 def inactive(self, **kwargs: object) -> None: 

66 """ 

67 Send an "inactive" (ie "contact has not interacted with the chat session 

68 interface for an intermediate period of time") chat state (:xep:`0085`) 

69 from this :term:`XMPP Entity`. 

70 """ 

71 self._chat_state("inactive", forced=False, **kwargs) 

72 

73 def gone(self, **kwargs: object) -> None: 

74 """ 

75 Send a "gone" (ie "contact has not interacted with the chat session interface, 

76 system, or device for a relatively long period of time") chat state 

77 (:xep:`0085`) from this :term:`XMPP Entity`. 

78 """ 

79 self._chat_state("gone", forced=False, **kwargs) 

80 

81 

82class MarkerMixin(MessageMaker): 

83 def _make_marker( 

84 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon: bool = False 

85 ) -> Message: 

86 msg = self._make_message(carbon=carbon) 

87 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id)[-1] 

88 return msg 

89 

90 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs: object) -> None: 

91 """ 

92 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

93 

94 :param legacy_msg_id: The message this marker refers to 

95 """ 

96 self._send( 

97 self._make_marker( 

98 legacy_msg_id, "acknowledged", carbon=bool(kwargs.get("carbon")) 

99 ), 

100 **kwargs, 

101 ) 

102 

103 def received(self, legacy_msg_id: LegacyMessageType, **kwargs: object) -> None: 

104 """ 

105 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

106 If called on a :class:`LegacyContact`, also send a delivery receipt 

107 marker (:xep:`0184`). 

108 

109 :param legacy_msg_id: The message this marker refers to 

110 """ 

111 carbon = bool(kwargs.get("carbon")) 

112 if self.mtype == "chat": 

113 for msg_id in self._legacy_to_xmpp(legacy_msg_id): 

114 self._send( 

115 self.xmpp.delivery_receipt.make_ack( 

116 msg_id, 

117 mfrom=self.jid, 

118 mto=self.user_jid, 

119 ) 

120 ) 

121 self._send( 

122 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs 

123 ) 

124 

125 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs: object) -> None: 

126 """ 

127 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

128 

129 :param legacy_msg_id: The message this marker refers to 

130 """ 

131 if ( 

132 self.xmpp.MARK_ALL_MESSAGES 

133 and (muc := getattr(self, "muc", None)) 

134 and getattr(self, "is_user", False) 

135 ): 

136 muc_jid = muc.jid.bare 

137 with self.xmpp.store.session() as orm: 

138 if self.xmpp.store.mam.is_displayed_by_user( 

139 orm, muc_jid, str(legacy_msg_id) 

140 ): 

141 self.session.log.debug( 

142 "Ignoring carbon marker for message already displayed by user." 

143 ) 

144 return 

145 else: 

146 muc.pop_unread_xmpp_ids_up_to( 

147 self._legacy_to_xmpp(legacy_msg_id)[-1] 

148 ) 

149 

150 self._send( 

151 self._make_marker( 

152 legacy_msg_id, "displayed", carbon=bool(kwargs.get("carbon")) 

153 ), 

154 **kwargs, 

155 ) 

156 if getattr(self, "is_user", False): 

157 self.session.create_task(self.__send_mds(legacy_msg_id)) 

158 

159 async def __send_mds(self, legacy_msg_id: LegacyMessageType) -> None: 

160 # Send a MDS displayed marker on behalf of the user for a group chat 

161 if muc := getattr(self, "muc", None): 

162 muc_jid = muc.jid.bare 

163 else: 

164 # This is not implemented for 1:1 chat because it would rely on 

165 # storing the XMPP-server injected stanza-id, which we don't track 

166 # ATM. 

167 # In practice, MDS should mostly be useful for public group chats, 

168 # so it should not be an issue. 

169 # We'll see if we need to implement that later 

170 return 

171 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id)[-1] 

172 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set") 

173 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS 

174 iq["pubsub"]["publish"]["item"]["id"] = muc_jid 

175 displayed = self.xmpp["xep_0490"].stanza.Displayed() 

176 displayed["stanza_id"]["id"] = xmpp_msg_id 

177 displayed["stanza_id"]["by"] = muc_jid 

178 iq["pubsub"]["publish"]["item"]["payload"] = displayed 

179 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS 

180 try: 

181 await self.xmpp["xep_0356"].send_privileged_iq(iq) 

182 except Exception as e: 

183 self.session.log.debug("Could not MDS mark", exc_info=e) 

184 

185 

186class ContentMessageMixin(AttachmentMixin, TextMessageMixin): 

187 pass 

188 

189 

190class CarbonMessageMixin(ContentMessageMixin, MarkerMixin): 

191 def _privileged_send(self, msg: Message) -> None: 

192 i = msg.get_id() 

193 if i: 

194 self.session.ignore_messages.add(i) 

195 else: 

196 i = "slidge-carbon-" + str(uuid.uuid4()) 

197 msg.set_id(i) 

198 msg.del_origin_id() 

199 try: 

200 self.xmpp["xep_0356"].send_privileged_message(msg) 

201 except PermissionError: 

202 warnings.warn( 

203 "Slidge does not have the privilege (XEP-0356) to send messages on behalf of users. " 

204 "Consider configuring your XMPP server for that." 

205 ) 

206 

207 

208class InviteMixin(MessageMaker): 

209 def invite_to( 

210 self, 

211 muc: AnyMUC, 

212 reason: str | None = None, 

213 password: str | None = None, 

214 **send_kwargs: object, 

215 ) -> None: 

216 """ 

217 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`. 

218 

219 :param muc: the muc the user is invited to 

220 :param reason: a text explaining why the user should join this muc 

221 :param password: maybe this will make sense later? not sure 

222 :param send_kwargs: additional kwargs to be passed to _send() 

223 (internal use by slidge) 

224 """ 

225 msg = self._make_message(mtype="normal") 

226 msg["groupchat_invite"]["jid"] = muc.jid 

227 if reason: 

228 msg["groupchat_invite"]["reason"] = reason 

229 if password: 

230 msg["groupchat_invite"]["password"] = password 

231 self._send(msg, **send_kwargs) 

232 

233 

234class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin): 

235 pass 

236 

237 

238class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin): 

239 pass 

240 

241 

242log = logging.getLogger(__name__)