Coverage for slidge/core/mixins/message_text.py: 90%
77 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import logging
2from datetime import datetime
3from typing import Iterable, Optional
5from slixmpp import Message
7from ...util.archive_msg import HistoryMessage
8from ...util.types import (
9 LegacyMessageType,
10 LegacyThreadType,
11 LinkPreview,
12 MessageReference,
13 ProcessingHint,
14)
15from ...util.util import add_quote_prefix
16from .message_maker import MessageMaker
19class TextMessageMixin(MessageMaker):
20 def __default_hints(self, hints: Optional[Iterable[ProcessingHint]] = None):
21 if hints is not None:
22 return hints
23 elif self.mtype == "chat":
24 return {"markable", "store"}
25 elif self.mtype == "groupchat":
26 return {"markable"}
28 def _replace_id(self, legacy_msg_id: LegacyMessageType) -> str:
29 if self.mtype == "groupchat":
30 with self.xmpp.store.session() as orm:
31 ids = self.xmpp.store.id_map.get_origin(
32 orm, self._recipient_pk(), str(legacy_msg_id)
33 )
34 if ids:
35 if len(ids) > 1:
36 log.warning(
37 "More than 1 origin msg ID for '%s': '%s'",
38 legacy_msg_id,
39 ids,
40 )
41 return ids[0]
42 return self.session.legacy_to_xmpp_msg_id(legacy_msg_id)
43 else:
44 return self._legacy_to_xmpp(legacy_msg_id)[0]
46 def send_text(
47 self,
48 body: str,
49 legacy_msg_id: Optional[LegacyMessageType] = None,
50 *,
51 when: Optional[datetime] = None,
52 reply_to: Optional[MessageReference] = None,
53 thread: Optional[LegacyThreadType] = None,
54 hints: Optional[Iterable[ProcessingHint]] = None,
55 carbon: bool = False,
56 archive_only: bool = False,
57 correction: bool = False,
58 correction_event_id: Optional[LegacyMessageType] = None,
59 link_previews: Optional[list[LinkPreview]] = None,
60 **send_kwargs,
61 ):
62 """
63 Send a text message from this :term:`XMPP Entity`.
65 :param body: Content of the message
66 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
67 user to the legacy network, specify this
68 :param when: when the message was sent, for a "delay" tag (:xep:`0203`)
69 :param reply_to: Quote another message (:xep:`0461`)
70 :param hints:
71 :param thread:
72 :param carbon: (only used if called on a :class:`LegacyContact`)
73 Set this to ``True`` if this is actually a message sent **to** the
74 :class:`LegacyContact` by the :term:`User`.
75 Use this to synchronize outgoing history for legacy official apps.
76 :param correction: whether this message is a correction or not
77 :param correction_event_id: in the case where an ID is associated with the legacy
78 'correction event', specify it here to use it on the XMPP side. If not specified,
79 a random ID will be used.
80 :param link_previews: A little of sender (or server, or gateway)-generated
81 previews of URLs linked in the body.
82 :param archive_only: (only in groups) Do not send this message to user,
83 but store it in the archive. Meant to be used during ``MUC.backfill()``
84 """
85 if carbon and not self.is_participant:
86 with self.xmpp.store.session() as orm:
87 if not correction and self.xmpp.store.id_map.was_sent_by_user(
88 orm, self._recipient_pk(), str(legacy_msg_id), True
89 ):
90 log.warning(
91 "Carbon message for a message an XMPP has sent? This is a bug! %s",
92 legacy_msg_id,
93 )
94 return
95 self.xmpp.store.id_map.set_msg(
96 orm,
97 self._recipient_pk(),
98 str(legacy_msg_id),
99 [self.session.legacy_to_xmpp_msg_id(legacy_msg_id)],
100 True,
101 )
102 orm.commit()
103 hints = self.__default_hints(hints)
104 msg = self._make_message(
105 mbody=body,
106 legacy_msg_id=correction_event_id if correction else legacy_msg_id,
107 when=when,
108 reply_to=reply_to,
109 hints=hints or (),
110 carbon=carbon,
111 thread=thread,
112 link_previews=link_previews,
113 )
114 if correction:
115 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
116 return self._send(
117 msg,
118 archive_only=archive_only,
119 carbon=carbon,
120 legacy_msg_id=legacy_msg_id,
121 **send_kwargs,
122 )
124 def correct(
125 self,
126 legacy_msg_id: LegacyMessageType,
127 new_text: str,
128 *,
129 when: Optional[datetime] = None,
130 reply_to: Optional[MessageReference] = None,
131 thread: Optional[LegacyThreadType] = None,
132 hints: Optional[Iterable[ProcessingHint]] = None,
133 carbon: bool = False,
134 archive_only: bool = False,
135 correction_event_id: Optional[LegacyMessageType] = None,
136 link_previews: Optional[list[LinkPreview]] = None,
137 **send_kwargs,
138 ) -> None:
139 """
140 Modify a message that was previously sent by this :term:`XMPP Entity`.
142 Uses last message correction (:xep:`0308`)
144 :param new_text: New content of the message
145 :param legacy_msg_id: The legacy message ID of the message to correct
146 :param when: when the message was sent, for a "delay" tag (:xep:`0203`)
147 :param reply_to: Quote another message (:xep:`0461`)
148 :param hints:
149 :param thread:
150 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user.
151 Use this to synchronize outgoing history for legacy official apps.
152 :param archive_only: (only in groups) Do not send this message to user,
153 but store it in the archive. Meant to be used during ``MUC.backfill()``
154 :param correction_event_id: in the case where an ID is associated with the legacy
155 'correction event', specify it here to use it on the XMPP side. If not specified,
156 a random ID will be used.
157 :param link_previews: A little of sender (or server, or gateway)-generated
158 previews of URLs linked in the body.
159 """
160 self.send_text(
161 new_text,
162 legacy_msg_id,
163 when=when,
164 reply_to=reply_to,
165 hints=hints,
166 carbon=carbon,
167 thread=thread,
168 correction=True,
169 archive_only=archive_only,
170 correction_event_id=correction_event_id,
171 link_previews=link_previews,
172 **send_kwargs,
173 )
175 def react(
176 self,
177 legacy_msg_id: LegacyMessageType,
178 emojis: Iterable[str] = (),
179 thread: Optional[LegacyThreadType] = None,
180 **kwargs,
181 ) -> None:
182 """
183 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`.
185 :param legacy_msg_id: The message which the reaction refers to.
186 :param emojis: An iterable of emojis used as reactions
187 :param thread:
188 """
189 xmpp_id = kwargs.pop("xmpp_id", None)
190 if xmpp_id:
191 xmpp_ids = [xmpp_id]
192 else:
193 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
194 for xmpp_id in xmpp_ids:
195 msg = self._make_message(
196 hints={"store"}, carbon=bool(kwargs.get("carbon")), thread=thread
197 )
198 self.xmpp["xep_0444"].set_reactions(
199 msg, to_id=xmpp_id, reactions=set(emojis)
200 )
201 self.__add_reaction_fallback(msg, legacy_msg_id, emojis)
202 self._send(msg, **kwargs)
204 def __add_reaction_fallback(
205 self,
206 msg: Message,
207 legacy_msg_id: LegacyMessageType,
208 emojis: Iterable[str] = (),
209 ) -> None:
210 if not self.session.user.preferences.get("reaction_fallback", False):
211 return
212 msg["fallback"]["for"] = self.xmpp.plugin["xep_0444"].namespace
213 msg["fallback"].enable("body")
214 msg["body"] = " ".join(emojis)
215 if not self.is_participant:
216 return
217 with self.xmpp.store.session() as orm:
218 archived = self.xmpp.store.mam.get_by_legacy_id(
219 orm, self.muc.stored.id, str(legacy_msg_id)
220 )
221 if archived is None:
222 return
223 history_msg = HistoryMessage(archived.stanza)
224 msg["body"] = (
225 add_quote_prefix(history_msg.stanza["body"]) + "\n" + msg["body"]
226 )
228 def retract(
229 self,
230 legacy_msg_id: LegacyMessageType,
231 thread: Optional[LegacyThreadType] = None,
232 **kwargs,
233 ) -> None:
234 """
235 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`.
237 :param legacy_msg_id: Legacy ID of the message to delete
238 :param thread:
239 """
240 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
241 replace_id = self._replace_id(legacy_msg_id)
242 if replace_id not in xmpp_ids:
243 xmpp_ids.append(replace_id)
244 for xmpp_id in xmpp_ids:
245 msg = self._make_message(
246 state=None,
247 hints={"store"},
248 mbody=f"/me retracted the message {legacy_msg_id}",
249 carbon=bool(kwargs.get("carbon")),
250 thread=thread,
251 )
252 msg.enable("fallback")
253 # namespace version mismatch between slidge and slixmpp, update me later
254 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1"
255 msg["retract"]["id"] = msg["replace"]["id"] = xmpp_id
256 self._send(msg, **kwargs)
259log = logging.getLogger(__name__)