Coverage for slidge / core / mixins / message_text.py: 89%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import logging 

2from collections.abc import Iterable 

3from datetime import datetime 

4 

5from slixmpp import Message 

6 

7from ...util.archive_msg import HistoryMessage 

8from ...util.types import ( 

9 AnyMessageReference, 

10 LegacyMessageType, 

11 LegacyThreadType, 

12 LinkPreview, 

13 ProcessingHint, 

14) 

15from ...util.util import add_quote_prefix 

16from .message_maker import MessageMaker 

17 

18 

19class TextMessageMixin(MessageMaker): 

20 def __default_hints( 

21 self, hints: Iterable[ProcessingHint] | None = None 

22 ) -> Iterable[ProcessingHint]: 

23 if hints is not None: 

24 return hints 

25 elif self.mtype == "chat": 

26 return {"markable", "store"} 

27 elif self.mtype == "groupchat": 

28 return {"markable"} 

29 else: 

30 raise RuntimeError("Never") 

31 

32 def _replace_id(self, legacy_msg_id: LegacyMessageType) -> str: 

33 if self.mtype == "groupchat": 

34 with self.xmpp.store.session() as orm: 

35 ids = self.xmpp.store.id_map.get_origin( 

36 orm, self._recipient_pk(), str(legacy_msg_id) 

37 ) 

38 if ids: 

39 if len(ids) > 1: 

40 log.warning( 

41 "More than 1 origin msg ID for '%s': '%s'", 

42 legacy_msg_id, 

43 ids, 

44 ) 

45 return ids[0] 

46 return self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

47 else: 

48 return self._legacy_to_xmpp(legacy_msg_id)[0] 

49 

50 def send_text( 

51 self, 

52 body: str, 

53 legacy_msg_id: LegacyMessageType | None = None, 

54 *, 

55 when: datetime | None = None, 

56 reply_to: AnyMessageReference | None = None, 

57 thread: LegacyThreadType | None = None, 

58 hints: Iterable[ProcessingHint] | None = None, 

59 carbon: bool = False, 

60 archive_only: bool = False, 

61 correction: bool = False, 

62 correction_event_id: LegacyMessageType | None = None, 

63 link_previews: list[LinkPreview] | None = None, 

64 **send_kwargs: object, 

65 ) -> Message | None: 

66 """ 

67 Send a text message from this :term:`XMPP Entity`. 

68 

69 :param body: Content of the message 

70 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

71 user to the legacy network, specify this 

72 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

73 :param reply_to: Quote another message (:xep:`0461`) 

74 :param hints: 

75 :param thread: 

76 :param carbon: (only used if called on a :class:`LegacyContact`) 

77 Set this to ``True`` if this is actually a message sent **to** the 

78 :class:`LegacyContact` by the :term:`User`. 

79 Use this to synchronize outgoing history for legacy official apps. 

80 :param correction: whether this message is a correction or not 

81 :param correction_event_id: in the case where an ID is associated with the legacy 

82 'correction event', specify it here to use it on the XMPP side. If not specified, 

83 a random ID will be used. 

84 :param link_previews: A little of sender (or server, or gateway)-generated 

85 previews of URLs linked in the body. 

86 :param archive_only: (only in groups) Do not send this message to user, 

87 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

88 """ 

89 if carbon and not self.is_participant: 

90 with self.xmpp.store.session() as orm: 

91 if not correction and self.xmpp.store.id_map.was_sent_by_user( 

92 orm, self._recipient_pk(), str(legacy_msg_id), False 

93 ): 

94 log.warning( 

95 "Carbon message for a message an XMPP has sent? This is a bug! %s", 

96 legacy_msg_id, 

97 ) 

98 return None 

99 self.xmpp.store.id_map.set_msg( 

100 orm, 

101 self._recipient_pk(), 

102 str(legacy_msg_id), 

103 [self.session.legacy_to_xmpp_msg_id(legacy_msg_id)], 

104 False, 

105 ) 

106 orm.commit() 

107 hints = self.__default_hints(hints) 

108 msg = self._make_message( 

109 mbody=body, 

110 legacy_msg_id=correction_event_id if correction else legacy_msg_id, 

111 when=when, 

112 reply_to=reply_to, 

113 hints=hints or (), 

114 carbon=carbon, 

115 thread=thread, 

116 link_previews=link_previews, 

117 ) 

118 if correction: 

119 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

120 return self._send( 

121 msg, 

122 archive_only=archive_only, 

123 carbon=carbon, 

124 legacy_msg_id=legacy_msg_id, 

125 **send_kwargs, 

126 ) 

127 

128 def correct( 

129 self, 

130 legacy_msg_id: LegacyMessageType, 

131 new_text: str, 

132 *, 

133 when: datetime | None = None, 

134 reply_to: AnyMessageReference | None = None, 

135 thread: LegacyThreadType | None = None, 

136 hints: Iterable[ProcessingHint] | None = None, 

137 carbon: bool = False, 

138 archive_only: bool = False, 

139 correction_event_id: LegacyMessageType | None = None, 

140 link_previews: list[LinkPreview] | None = None, 

141 **send_kwargs: object, 

142 ) -> None: 

143 """ 

144 Modify a message that was previously sent by this :term:`XMPP Entity`. 

145 

146 Uses last message correction (:xep:`0308`) 

147 

148 :param new_text: New content of the message 

149 :param legacy_msg_id: The legacy message ID of the message to correct 

150 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

151 :param reply_to: Quote another message (:xep:`0461`) 

152 :param hints: 

153 :param thread: 

154 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user. 

155 Use this to synchronize outgoing history for legacy official apps. 

156 :param archive_only: (only in groups) Do not send this message to user, 

157 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

158 :param correction_event_id: in the case where an ID is associated with the legacy 

159 'correction event', specify it here to use it on the XMPP side. If not specified, 

160 a random ID will be used. 

161 :param link_previews: A little of sender (or server, or gateway)-generated 

162 previews of URLs linked in the body. 

163 """ 

164 self.send_text( 

165 new_text, 

166 legacy_msg_id, 

167 when=when, 

168 reply_to=reply_to, 

169 hints=hints, 

170 carbon=carbon, 

171 thread=thread, 

172 correction=True, 

173 archive_only=archive_only, 

174 correction_event_id=correction_event_id, 

175 link_previews=link_previews, 

176 **send_kwargs, 

177 ) 

178 

179 def react( 

180 self, 

181 legacy_msg_id: LegacyMessageType, 

182 emojis: Iterable[str] = (), 

183 thread: LegacyThreadType | None = None, 

184 **kwargs: object, 

185 ) -> None: 

186 """ 

187 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`. 

188 

189 :param legacy_msg_id: The message which the reaction refers to. 

190 :param emojis: An iterable of emojis used as reactions 

191 :param thread: 

192 """ 

193 xmpp_id = kwargs.pop("xmpp_id", None) 

194 if xmpp_id: 

195 assert isinstance(xmpp_id, str) 

196 xmpp_ids = [xmpp_id] 

197 else: 

198 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

199 for xmpp_id in xmpp_ids: 

200 msg = self._make_message( 

201 hints={"store"}, carbon=bool(kwargs.get("carbon")), thread=thread 

202 ) 

203 self.xmpp["xep_0444"].set_reactions( 

204 msg, to_id=xmpp_id, reactions=set(emojis) 

205 ) 

206 self.__add_reaction_fallback(msg, legacy_msg_id, emojis) 

207 self._send(msg, **kwargs) 

208 

209 def __add_reaction_fallback( 

210 self, 

211 msg: Message, 

212 legacy_msg_id: LegacyMessageType, 

213 emojis: Iterable[str] = (), 

214 ) -> None: 

215 if not self.session.user.preferences.get("reaction_fallback", False): 

216 return 

217 msg["fallback"]["for"] = self.xmpp.plugin["xep_0444"].namespace 

218 msg["fallback"].enable("body") 

219 msg["body"] = " ".join(emojis) 

220 if not self.is_participant: 

221 return 

222 with self.xmpp.store.session() as orm: 

223 archived = self.xmpp.store.mam.get_by_legacy_id( 

224 orm, self.muc.stored.id, str(legacy_msg_id) 

225 ) 

226 if archived is None: 

227 return 

228 history_msg = HistoryMessage(archived.stanza) 

229 msg["body"] = ( 

230 add_quote_prefix(history_msg.stanza["body"]) + "\n" + msg["body"] 

231 ) 

232 

233 def retract( 

234 self, 

235 legacy_msg_id: LegacyMessageType, 

236 thread: LegacyThreadType | None = None, 

237 **kwargs: object, 

238 ) -> None: 

239 """ 

240 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`. 

241 

242 :param legacy_msg_id: Legacy ID of the message to delete 

243 :param thread: 

244 """ 

245 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

246 replace_id = self._replace_id(legacy_msg_id) 

247 if replace_id not in xmpp_ids: 

248 xmpp_ids.append(replace_id) 

249 for xmpp_id in xmpp_ids: 

250 msg = self._make_message( 

251 state=None, 

252 hints={"store"}, 

253 mbody=f"/me retracted the message {legacy_msg_id}", 

254 carbon=bool(kwargs.get("carbon")), 

255 thread=thread, 

256 ) 

257 msg.enable("fallback") 

258 # namespace version mismatch between slidge and slixmpp, update me later 

259 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1" 

260 msg["retract"]["id"] = msg["replace"]["id"] = xmpp_id 

261 self._send(msg, **kwargs) 

262 

263 

264log = logging.getLogger(__name__)