Coverage for slidge / core / mixins / message_text.py: 87%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import logging
2from collections.abc import Iterable
3from datetime import datetime
5from slixmpp import Message
7from ...util.archive_msg import HistoryMessage
8from ...util.types import (
9 LinkPreview,
10 MessageReference,
11 ProcessingHint,
12)
13from ...util.util import add_quote_prefix
14from .message_maker import MessageMaker
17class TextMessageMixin(MessageMaker):
18 def __default_hints(
19 self, hints: Iterable[ProcessingHint] | None = None
20 ) -> Iterable[ProcessingHint]:
21 if hints is not None:
22 return hints
23 elif self.mtype == "chat":
24 return {"markable", "store"}
25 elif self.mtype == "groupchat":
26 return {"markable"}
27 else:
28 raise RuntimeError("Never")
30 def _replace_id(self, legacy_msg_id: str) -> str:
31 if self.mtype == "groupchat":
32 with self.xmpp.store.session() as orm:
33 ids = self.xmpp.store.id_map.get_origin(
34 orm, self._recipient_pk(), str(legacy_msg_id)
35 )
36 if ids:
37 if len(ids) > 1:
38 log.warning(
39 "More than 1 origin msg ID for '%s': '%s'",
40 legacy_msg_id,
41 ids,
42 )
43 return ids[0]
44 return legacy_msg_id
45 else:
46 return self._legacy_to_xmpp(legacy_msg_id)[0]
48 def send_text(
49 self,
50 body: str,
51 legacy_msg_id: str | None = None,
52 *,
53 when: datetime | None = None,
54 reply_to: MessageReference | None = None,
55 thread: str | None = None,
56 hints: Iterable[ProcessingHint] | None = None,
57 carbon: bool = False,
58 archive_only: bool = False,
59 correction: bool = False,
60 correction_event_id: str | None = None,
61 link_previews: list[LinkPreview] | None = None,
62 **send_kwargs: object,
63 ) -> Message | None:
64 """
65 Send a text message from this :term:`XMPP Entity`.
67 :param body: Content of the message
68 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
69 user to the legacy network, specify this
70 :param when: when the message was sent, for a "delay" tag (:xep:`0203`)
71 :param reply_to: Quote another message (:xep:`0461`)
72 :param hints:
73 :param thread:
74 :param carbon: (only used if called on a :class:`LegacyContact`)
75 Set this to ``True`` if this is actually a message sent **to** the
76 :class:`LegacyContact` by the :term:`User`.
77 Use this to synchronize outgoing history for legacy official apps.
78 :param correction: whether this message is a correction or not
79 :param correction_event_id: in the case where an ID is associated with the legacy
80 'correction event', specify it here to use it on the XMPP side. If not specified,
81 a random ID will be used.
82 :param link_previews: A little of sender (or server, or gateway)-generated
83 previews of URLs linked in the body.
84 :param archive_only: (only in groups) Do not send this message to user,
85 but store it in the archive. Meant to be used during ``MUC.backfill()``
86 """
87 if carbon and not self.is_participant:
88 with self.xmpp.store.session() as orm:
89 if not correction and self.xmpp.store.id_map.was_sent_by_user(
90 orm, self._recipient_pk(), str(legacy_msg_id), False
91 ):
92 log.warning(
93 "Carbon message for a message an XMPP has sent? This is a bug! %s",
94 legacy_msg_id,
95 )
96 return None
97 hints = self.__default_hints(hints)
98 msg = self._make_message(
99 mbody=body,
100 legacy_msg_id=correction_event_id if correction else legacy_msg_id,
101 when=when,
102 reply_to=reply_to,
103 hints=hints or (),
104 carbon=carbon,
105 thread=thread,
106 link_previews=link_previews,
107 )
108 if correction:
109 if not legacy_msg_id:
110 raise TypeError
111 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
112 return self._send(
113 msg,
114 archive_only=archive_only,
115 carbon=carbon,
116 legacy_msg_id=legacy_msg_id,
117 **send_kwargs,
118 )
120 def correct(
121 self,
122 legacy_msg_id: str,
123 new_text: str,
124 *,
125 when: datetime | None = None,
126 reply_to: MessageReference | None = None,
127 thread: str | None = None,
128 hints: Iterable[ProcessingHint] | None = None,
129 carbon: bool = False,
130 archive_only: bool = False,
131 correction_event_id: str | None = None,
132 link_previews: list[LinkPreview] | None = None,
133 **send_kwargs: object,
134 ) -> None:
135 """
136 Modify a message that was previously sent by this :term:`XMPP Entity`.
138 Uses last message correction (:xep:`0308`)
140 :param new_text: New content of the message
141 :param legacy_msg_id: The legacy message ID of the message to correct
142 :param when: when the message was sent, for a "delay" tag (:xep:`0203`)
143 :param reply_to: Quote another message (:xep:`0461`)
144 :param hints:
145 :param thread:
146 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user.
147 Use this to synchronize outgoing history for legacy official apps.
148 :param archive_only: (only in groups) Do not send this message to user,
149 but store it in the archive. Meant to be used during ``MUC.backfill()``
150 :param correction_event_id: in the case where an ID is associated with the legacy
151 'correction event', specify it here to use it on the XMPP side. If not specified,
152 a random ID will be used.
153 :param link_previews: A little of sender (or server, or gateway)-generated
154 previews of URLs linked in the body.
155 """
156 self.send_text(
157 new_text,
158 legacy_msg_id,
159 when=when,
160 reply_to=reply_to,
161 hints=hints,
162 carbon=carbon,
163 thread=thread,
164 correction=True,
165 archive_only=archive_only,
166 correction_event_id=correction_event_id,
167 link_previews=link_previews,
168 **send_kwargs,
169 )
171 def react(
172 self,
173 legacy_msg_id: str,
174 emojis: Iterable[str] = (),
175 thread: str | None = None,
176 **kwargs: object,
177 ) -> None:
178 """
179 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`.
181 :param legacy_msg_id: The message which the reaction refers to.
182 :param emojis: An iterable of emojis used as reactions
183 :param thread:
184 """
185 xmpp_id = kwargs.pop("xmpp_id", None)
186 if xmpp_id:
187 assert isinstance(xmpp_id, str)
188 xmpp_ids = [xmpp_id]
189 else:
190 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
191 for xmpp_id in xmpp_ids:
192 msg = self._make_message(
193 hints={"store"}, carbon=bool(kwargs.get("carbon")), thread=thread
194 )
195 self.xmpp["xep_0444"].set_reactions(
196 msg, to_id=xmpp_id, reactions=set(emojis)
197 )
198 self.__add_reaction_fallback(msg, legacy_msg_id, emojis)
199 self._send(msg, **kwargs)
201 def __add_reaction_fallback(
202 self,
203 msg: Message,
204 legacy_msg_id: str,
205 emojis: Iterable[str] = (),
206 ) -> None:
207 if not self.session.user.preferences.get("reaction_fallback", False):
208 return
209 msg["fallback"]["for"] = self.xmpp.plugin["xep_0444"].namespace
210 msg["fallback"].enable("body")
211 msg["body"] = " ".join(emojis)
212 if not self.is_participant:
213 return
214 with self.xmpp.store.session() as orm:
215 archived = self.xmpp.store.mam.get_by_legacy_id(
216 orm, self.muc.stored.id, str(legacy_msg_id)
217 )
218 if archived is None:
219 return
220 history_msg = HistoryMessage(archived.stanza)
221 msg["body"] = (
222 add_quote_prefix(history_msg.stanza["body"]) + "\n" + msg["body"]
223 )
225 def retract(
226 self,
227 legacy_msg_id: str,
228 thread: str | None = None,
229 **kwargs: object,
230 ) -> None:
231 """
232 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`.
234 :param legacy_msg_id: Legacy ID of the message to delete
235 :param thread:
236 """
237 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
238 replace_id = self._replace_id(legacy_msg_id)
239 if replace_id not in xmpp_ids:
240 xmpp_ids.append(replace_id)
241 for xmpp_id in xmpp_ids:
242 msg = self._make_message(
243 state=None,
244 hints={"store"},
245 mbody=f"/me retracted the message {legacy_msg_id}",
246 carbon=bool(kwargs.get("carbon")),
247 thread=thread,
248 )
249 msg.enable("fallback")
250 # namespace version mismatch between slidge and slixmpp, update me later
251 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1"
252 msg["retract"]["id"] = msg["replace"]["id"] = xmpp_id
253 self._send(msg, **kwargs)
256log = logging.getLogger(__name__)