Coverage for slidge/core/mixins/message_text.py: 90%

77 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import logging 

2from datetime import datetime 

3from typing import Iterable, Optional 

4 

5from slixmpp import Message 

6 

7from ...util.archive_msg import HistoryMessage 

8from ...util.types import ( 

9 LegacyMessageType, 

10 LegacyThreadType, 

11 LinkPreview, 

12 MessageReference, 

13 ProcessingHint, 

14) 

15from ...util.util import add_quote_prefix 

16from .message_maker import MessageMaker 

17 

18 

19class TextMessageMixin(MessageMaker): 

20 def __default_hints(self, hints: Optional[Iterable[ProcessingHint]] = None): 

21 if hints is not None: 

22 return hints 

23 elif self.mtype == "chat": 

24 return {"markable", "store"} 

25 elif self.mtype == "groupchat": 

26 return {"markable"} 

27 

28 def _replace_id(self, legacy_msg_id: LegacyMessageType) -> str: 

29 if self.mtype == "groupchat": 

30 with self.xmpp.store.session() as orm: 

31 ids = self.xmpp.store.id_map.get_origin( 

32 orm, self._recipient_pk(), str(legacy_msg_id) 

33 ) 

34 if ids: 

35 if len(ids) > 1: 

36 log.warning( 

37 "More than 1 origin msg ID for '%s': '%s'", 

38 legacy_msg_id, 

39 ids, 

40 ) 

41 return ids[0] 

42 return self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

43 else: 

44 return self._legacy_to_xmpp(legacy_msg_id)[0] 

45 

46 def send_text( 

47 self, 

48 body: str, 

49 legacy_msg_id: Optional[LegacyMessageType] = None, 

50 *, 

51 when: Optional[datetime] = None, 

52 reply_to: Optional[MessageReference] = None, 

53 thread: Optional[LegacyThreadType] = None, 

54 hints: Optional[Iterable[ProcessingHint]] = None, 

55 carbon: bool = False, 

56 archive_only: bool = False, 

57 correction: bool = False, 

58 correction_event_id: Optional[LegacyMessageType] = None, 

59 link_previews: Optional[list[LinkPreview]] = None, 

60 **send_kwargs, 

61 ): 

62 """ 

63 Send a text message from this :term:`XMPP Entity`. 

64 

65 :param body: Content of the message 

66 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

67 user to the legacy network, specify this 

68 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

69 :param reply_to: Quote another message (:xep:`0461`) 

70 :param hints: 

71 :param thread: 

72 :param carbon: (only used if called on a :class:`LegacyContact`) 

73 Set this to ``True`` if this is actually a message sent **to** the 

74 :class:`LegacyContact` by the :term:`User`. 

75 Use this to synchronize outgoing history for legacy official apps. 

76 :param correction: whether this message is a correction or not 

77 :param correction_event_id: in the case where an ID is associated with the legacy 

78 'correction event', specify it here to use it on the XMPP side. If not specified, 

79 a random ID will be used. 

80 :param link_previews: A little of sender (or server, or gateway)-generated 

81 previews of URLs linked in the body. 

82 :param archive_only: (only in groups) Do not send this message to user, 

83 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

84 """ 

85 if carbon and not self.is_participant: 

86 with self.xmpp.store.session() as orm: 

87 if not correction and self.xmpp.store.id_map.was_sent_by_user( 

88 orm, self._recipient_pk(), str(legacy_msg_id), True 

89 ): 

90 log.warning( 

91 "Carbon message for a message an XMPP has sent? This is a bug! %s", 

92 legacy_msg_id, 

93 ) 

94 return 

95 self.xmpp.store.id_map.set_msg( 

96 orm, 

97 self._recipient_pk(), 

98 str(legacy_msg_id), 

99 [self.session.legacy_to_xmpp_msg_id(legacy_msg_id)], 

100 True, 

101 ) 

102 orm.commit() 

103 hints = self.__default_hints(hints) 

104 msg = self._make_message( 

105 mbody=body, 

106 legacy_msg_id=correction_event_id if correction else legacy_msg_id, 

107 when=when, 

108 reply_to=reply_to, 

109 hints=hints or (), 

110 carbon=carbon, 

111 thread=thread, 

112 link_previews=link_previews, 

113 ) 

114 if correction: 

115 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

116 return self._send( 

117 msg, 

118 archive_only=archive_only, 

119 carbon=carbon, 

120 legacy_msg_id=legacy_msg_id, 

121 **send_kwargs, 

122 ) 

123 

124 def correct( 

125 self, 

126 legacy_msg_id: LegacyMessageType, 

127 new_text: str, 

128 *, 

129 when: Optional[datetime] = None, 

130 reply_to: Optional[MessageReference] = None, 

131 thread: Optional[LegacyThreadType] = None, 

132 hints: Optional[Iterable[ProcessingHint]] = None, 

133 carbon: bool = False, 

134 archive_only: bool = False, 

135 correction_event_id: Optional[LegacyMessageType] = None, 

136 link_previews: Optional[list[LinkPreview]] = None, 

137 **send_kwargs, 

138 ) -> None: 

139 """ 

140 Modify a message that was previously sent by this :term:`XMPP Entity`. 

141 

142 Uses last message correction (:xep:`0308`) 

143 

144 :param new_text: New content of the message 

145 :param legacy_msg_id: The legacy message ID of the message to correct 

146 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

147 :param reply_to: Quote another message (:xep:`0461`) 

148 :param hints: 

149 :param thread: 

150 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user. 

151 Use this to synchronize outgoing history for legacy official apps. 

152 :param archive_only: (only in groups) Do not send this message to user, 

153 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

154 :param correction_event_id: in the case where an ID is associated with the legacy 

155 'correction event', specify it here to use it on the XMPP side. If not specified, 

156 a random ID will be used. 

157 :param link_previews: A little of sender (or server, or gateway)-generated 

158 previews of URLs linked in the body. 

159 """ 

160 self.send_text( 

161 new_text, 

162 legacy_msg_id, 

163 when=when, 

164 reply_to=reply_to, 

165 hints=hints, 

166 carbon=carbon, 

167 thread=thread, 

168 correction=True, 

169 archive_only=archive_only, 

170 correction_event_id=correction_event_id, 

171 link_previews=link_previews, 

172 **send_kwargs, 

173 ) 

174 

175 def react( 

176 self, 

177 legacy_msg_id: LegacyMessageType, 

178 emojis: Iterable[str] = (), 

179 thread: Optional[LegacyThreadType] = None, 

180 **kwargs, 

181 ) -> None: 

182 """ 

183 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`. 

184 

185 :param legacy_msg_id: The message which the reaction refers to. 

186 :param emojis: An iterable of emojis used as reactions 

187 :param thread: 

188 """ 

189 xmpp_id = kwargs.pop("xmpp_id", None) 

190 if xmpp_id: 

191 xmpp_ids = [xmpp_id] 

192 else: 

193 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

194 for xmpp_id in xmpp_ids: 

195 msg = self._make_message( 

196 hints={"store"}, carbon=bool(kwargs.get("carbon")), thread=thread 

197 ) 

198 self.xmpp["xep_0444"].set_reactions( 

199 msg, to_id=xmpp_id, reactions=set(emojis) 

200 ) 

201 self.__add_reaction_fallback(msg, legacy_msg_id, emojis) 

202 self._send(msg, **kwargs) 

203 

204 def __add_reaction_fallback( 

205 self, 

206 msg: Message, 

207 legacy_msg_id: LegacyMessageType, 

208 emojis: Iterable[str] = (), 

209 ) -> None: 

210 if not self.session.user.preferences.get("reaction_fallback", False): 

211 return 

212 msg["fallback"]["for"] = self.xmpp.plugin["xep_0444"].namespace 

213 msg["fallback"].enable("body") 

214 msg["body"] = " ".join(emojis) 

215 if not self.is_participant: 

216 return 

217 with self.xmpp.store.session() as orm: 

218 archived = self.xmpp.store.mam.get_by_legacy_id( 

219 orm, self.muc.stored.id, str(legacy_msg_id) 

220 ) 

221 if archived is None: 

222 return 

223 history_msg = HistoryMessage(archived.stanza) 

224 msg["body"] = ( 

225 add_quote_prefix(history_msg.stanza["body"]) + "\n" + msg["body"] 

226 ) 

227 

228 def retract( 

229 self, 

230 legacy_msg_id: LegacyMessageType, 

231 thread: Optional[LegacyThreadType] = None, 

232 **kwargs, 

233 ) -> None: 

234 """ 

235 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`. 

236 

237 :param legacy_msg_id: Legacy ID of the message to delete 

238 :param thread: 

239 """ 

240 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

241 replace_id = self._replace_id(legacy_msg_id) 

242 if replace_id not in xmpp_ids: 

243 xmpp_ids.append(replace_id) 

244 for xmpp_id in xmpp_ids: 

245 msg = self._make_message( 

246 state=None, 

247 hints={"store"}, 

248 mbody=f"/me retracted the message {legacy_msg_id}", 

249 carbon=bool(kwargs.get("carbon")), 

250 thread=thread, 

251 ) 

252 msg.enable("fallback") 

253 # namespace version mismatch between slidge and slixmpp, update me later 

254 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1" 

255 msg["retract"]["id"] = msg["replace"]["id"] = xmpp_id 

256 self._send(msg, **kwargs) 

257 

258 

259log = logging.getLogger(__name__)