Coverage for slidge / core / mixins / message_text.py: 87%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import logging 

2from collections.abc import Iterable 

3from datetime import datetime 

4 

5from slixmpp import Message 

6 

7from ...util.archive_msg import HistoryMessage 

8from ...util.types import ( 

9 LinkPreview, 

10 MessageReference, 

11 ProcessingHint, 

12) 

13from ...util.util import add_quote_prefix 

14from .message_maker import MessageMaker 

15 

16 

17class TextMessageMixin(MessageMaker): 

18 def __default_hints( 

19 self, hints: Iterable[ProcessingHint] | None = None 

20 ) -> Iterable[ProcessingHint]: 

21 if hints is not None: 

22 return hints 

23 elif self.mtype == "chat": 

24 return {"markable", "store"} 

25 elif self.mtype == "groupchat": 

26 return {"markable"} 

27 else: 

28 raise RuntimeError("Never") 

29 

30 def _replace_id(self, legacy_msg_id: str) -> str: 

31 if self.mtype == "groupchat": 

32 with self.xmpp.store.session() as orm: 

33 ids = self.xmpp.store.id_map.get_origin( 

34 orm, self._recipient_pk(), str(legacy_msg_id) 

35 ) 

36 if ids: 

37 if len(ids) > 1: 

38 log.warning( 

39 "More than 1 origin msg ID for '%s': '%s'", 

40 legacy_msg_id, 

41 ids, 

42 ) 

43 return ids[0] 

44 return legacy_msg_id 

45 else: 

46 return self._legacy_to_xmpp(legacy_msg_id)[0] 

47 

48 def send_text( 

49 self, 

50 body: str, 

51 legacy_msg_id: str | None = None, 

52 *, 

53 when: datetime | None = None, 

54 reply_to: MessageReference | None = None, 

55 thread: str | None = None, 

56 hints: Iterable[ProcessingHint] | None = None, 

57 carbon: bool = False, 

58 archive_only: bool = False, 

59 correction: bool = False, 

60 correction_event_id: str | None = None, 

61 link_previews: list[LinkPreview] | None = None, 

62 **send_kwargs: object, 

63 ) -> Message | None: 

64 """ 

65 Send a text message from this :term:`XMPP Entity`. 

66 

67 :param body: Content of the message 

68 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

69 user to the legacy network, specify this 

70 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

71 :param reply_to: Quote another message (:xep:`0461`) 

72 :param hints: 

73 :param thread: 

74 :param carbon: (only used if called on a :class:`LegacyContact`) 

75 Set this to ``True`` if this is actually a message sent **to** the 

76 :class:`LegacyContact` by the :term:`User`. 

77 Use this to synchronize outgoing history for legacy official apps. 

78 :param correction: whether this message is a correction or not 

79 :param correction_event_id: in the case where an ID is associated with the legacy 

80 'correction event', specify it here to use it on the XMPP side. If not specified, 

81 a random ID will be used. 

82 :param link_previews: A little of sender (or server, or gateway)-generated 

83 previews of URLs linked in the body. 

84 :param archive_only: (only in groups) Do not send this message to user, 

85 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

86 """ 

87 if carbon and not self.is_participant: 

88 with self.xmpp.store.session() as orm: 

89 if not correction and self.xmpp.store.id_map.was_sent_by_user( 

90 orm, self._recipient_pk(), str(legacy_msg_id), False 

91 ): 

92 log.warning( 

93 "Carbon message for a message an XMPP has sent? This is a bug! %s", 

94 legacy_msg_id, 

95 ) 

96 return None 

97 hints = self.__default_hints(hints) 

98 msg = self._make_message( 

99 mbody=body, 

100 legacy_msg_id=correction_event_id if correction else legacy_msg_id, 

101 when=when, 

102 reply_to=reply_to, 

103 hints=hints or (), 

104 carbon=carbon, 

105 thread=thread, 

106 link_previews=link_previews, 

107 ) 

108 if correction: 

109 if not legacy_msg_id: 

110 raise TypeError 

111 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

112 return self._send( 

113 msg, 

114 archive_only=archive_only, 

115 carbon=carbon, 

116 legacy_msg_id=legacy_msg_id, 

117 **send_kwargs, 

118 ) 

119 

120 def correct( 

121 self, 

122 legacy_msg_id: str, 

123 new_text: str, 

124 *, 

125 when: datetime | None = None, 

126 reply_to: MessageReference | None = None, 

127 thread: str | None = None, 

128 hints: Iterable[ProcessingHint] | None = None, 

129 carbon: bool = False, 

130 archive_only: bool = False, 

131 correction_event_id: str | None = None, 

132 link_previews: list[LinkPreview] | None = None, 

133 **send_kwargs: object, 

134 ) -> None: 

135 """ 

136 Modify a message that was previously sent by this :term:`XMPP Entity`. 

137 

138 Uses last message correction (:xep:`0308`) 

139 

140 :param new_text: New content of the message 

141 :param legacy_msg_id: The legacy message ID of the message to correct 

142 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

143 :param reply_to: Quote another message (:xep:`0461`) 

144 :param hints: 

145 :param thread: 

146 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user. 

147 Use this to synchronize outgoing history for legacy official apps. 

148 :param archive_only: (only in groups) Do not send this message to user, 

149 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

150 :param correction_event_id: in the case where an ID is associated with the legacy 

151 'correction event', specify it here to use it on the XMPP side. If not specified, 

152 a random ID will be used. 

153 :param link_previews: A little of sender (or server, or gateway)-generated 

154 previews of URLs linked in the body. 

155 """ 

156 self.send_text( 

157 new_text, 

158 legacy_msg_id, 

159 when=when, 

160 reply_to=reply_to, 

161 hints=hints, 

162 carbon=carbon, 

163 thread=thread, 

164 correction=True, 

165 archive_only=archive_only, 

166 correction_event_id=correction_event_id, 

167 link_previews=link_previews, 

168 **send_kwargs, 

169 ) 

170 

171 def react( 

172 self, 

173 legacy_msg_id: str, 

174 emojis: Iterable[str] = (), 

175 thread: str | None = None, 

176 **kwargs: object, 

177 ) -> None: 

178 """ 

179 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`. 

180 

181 :param legacy_msg_id: The message which the reaction refers to. 

182 :param emojis: An iterable of emojis used as reactions 

183 :param thread: 

184 """ 

185 xmpp_id = kwargs.pop("xmpp_id", None) 

186 if xmpp_id: 

187 assert isinstance(xmpp_id, str) 

188 xmpp_ids = [xmpp_id] 

189 else: 

190 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

191 for xmpp_id in xmpp_ids: 

192 msg = self._make_message( 

193 hints={"store"}, carbon=bool(kwargs.get("carbon")), thread=thread 

194 ) 

195 self.xmpp["xep_0444"].set_reactions( 

196 msg, to_id=xmpp_id, reactions=set(emojis) 

197 ) 

198 self.__add_reaction_fallback(msg, legacy_msg_id, emojis) 

199 self._send(msg, **kwargs) 

200 

201 def __add_reaction_fallback( 

202 self, 

203 msg: Message, 

204 legacy_msg_id: str, 

205 emojis: Iterable[str] = (), 

206 ) -> None: 

207 if not self.session.user.preferences.get("reaction_fallback", False): 

208 return 

209 msg["fallback"]["for"] = self.xmpp.plugin["xep_0444"].namespace 

210 msg["fallback"].enable("body") 

211 msg["body"] = " ".join(emojis) 

212 if not self.is_participant: 

213 return 

214 with self.xmpp.store.session() as orm: 

215 archived = self.xmpp.store.mam.get_by_legacy_id( 

216 orm, self.muc.stored.id, str(legacy_msg_id) 

217 ) 

218 if archived is None: 

219 return 

220 history_msg = HistoryMessage(archived.stanza) 

221 msg["body"] = ( 

222 add_quote_prefix(history_msg.stanza["body"]) + "\n" + msg["body"] 

223 ) 

224 

225 def retract( 

226 self, 

227 legacy_msg_id: str, 

228 thread: str | None = None, 

229 **kwargs: object, 

230 ) -> None: 

231 """ 

232 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`. 

233 

234 :param legacy_msg_id: Legacy ID of the message to delete 

235 :param thread: 

236 """ 

237 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

238 replace_id = self._replace_id(legacy_msg_id) 

239 if replace_id not in xmpp_ids: 

240 xmpp_ids.append(replace_id) 

241 for xmpp_id in xmpp_ids: 

242 msg = self._make_message( 

243 state=None, 

244 hints={"store"}, 

245 mbody=f"/me retracted the message {legacy_msg_id}", 

246 carbon=bool(kwargs.get("carbon")), 

247 thread=thread, 

248 ) 

249 msg.enable("fallback") 

250 # namespace version mismatch between slidge and slixmpp, update me later 

251 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1" 

252 msg["retract"]["id"] = msg["replace"]["id"] = xmpp_id 

253 self._send(msg, **kwargs) 

254 

255 

256log = logging.getLogger(__name__)