Coverage for slidge / core / dispatcher / message / message.py: 81%
236 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import logging
2from copy import copy
3from xml.etree import ElementTree
5from slixmpp import JID, Message
6from slixmpp.exceptions import XMPPError
8from ....contact.contact import LegacyContact
9from ....group.participant import LegacyParticipant
10from ....group.room import LegacyMUC
11from ....util.types import LinkPreview, Recipient
12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
13from ... import config
14from ...session import BaseSession
15from ..util import DispatcherMixin, exceptions_to_xmpp_errors
18class MessageContentMixin(DispatcherMixin):
19 __slots__: list[str] = []
21 def __init__(self, xmpp) -> None:
22 super().__init__(xmpp)
23 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
24 xmpp.add_event_handler("message_correction", self.on_message_correction)
25 xmpp.add_event_handler("message_retract", self.on_message_retract)
26 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
27 xmpp.add_event_handler("reactions", self.on_reactions)
29 async def on_groupchat_message(self, msg: Message) -> None:
30 await self.on_legacy_message(msg)
32 @exceptions_to_xmpp_errors
33 async def on_legacy_message(self, msg: Message) -> None:
34 """
35 Meant to be called from :class:`BaseGateway` only.
37 :param msg:
38 :return:
39 """
40 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
41 # present. this is a problem for MUC echoed messages
42 if msg.get_plugin("replace", check=True) is not None:
43 # ignore last message correction (handled by a specific method)
44 return
45 if msg.get_plugin("apply_to", check=True) is not None:
46 # ignore message retraction (handled by a specific method)
47 return
48 if msg.get_plugin("reactions", check=True) is not None:
49 # ignore message reaction fallback.
50 # the reaction itself is handled by self.react_from_msg().
51 return
52 if msg.get_plugin("retract", check=True) is not None:
53 # ignore message retraction fallback.
54 # the retraction itself is handled by self.on_retract
55 return
56 cid = None
57 if msg.get_plugin("html", check=True) is not None:
58 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
59 p = body.findall("p")
60 if p is not None and len(p) == 1:
61 if p[0].text is None or not p[0].text.strip():
62 images = p[0].findall("img")
63 if len(images) == 1:
64 # no text, single img ⇒ this is a sticker
65 # other cases should be interpreted as "custom emojis" in text
66 src = images[0].get("src")
67 if src is not None and src.startswith("cid:"):
68 cid = src.removeprefix("cid:")
70 session, recipient, thread = await self._get_session_recipient_thread(msg)
72 if msg.get_plugin("oob", check=True) is not None:
73 url = msg["oob"]["url"]
74 elif (
75 "reference" in msg
76 and "sims" in msg["reference"]
77 and "sources" in msg["reference"]["sims"]
78 ):
79 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
80 if source["uri"].startswith("http"):
81 url = source["uri"]
82 break
83 else:
84 url = None
85 else:
86 url = None
88 if msg.get_plugin("reply", check=True):
89 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply(
90 msg, session, recipient
91 )
92 else:
93 text = msg["body"]
94 reply_to_msg_id = None
95 reply_to = None
96 reply_fallback = None
98 if msg.get_plugin("link_previews", check=True):
99 link_previews = [
100 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"]
101 ]
102 else:
103 link_previews = []
105 if url:
106 legacy_msg_id = await self.__send_url(
107 url,
108 session,
109 recipient,
110 reply_to_msg_id=reply_to_msg_id,
111 reply_to_fallback_text=reply_fallback,
112 reply_to=reply_to,
113 thread=thread,
114 )
115 elif cid:
116 legacy_msg_id = await self.__send_bob(
117 msg.get_from(),
118 cid,
119 session,
120 recipient,
121 reply_to_msg_id=reply_to_msg_id,
122 reply_to_fallback_text=reply_fallback,
123 reply_to=reply_to,
124 thread=thread,
125 )
126 elif text:
127 if isinstance(recipient, LegacyMUC):
128 mentions = {"mentions": await recipient.parse_mentions(text)}
129 else:
130 mentions = {}
131 legacy_msg_id = await session.on_text(
132 recipient,
133 text,
134 reply_to_msg_id=reply_to_msg_id,
135 reply_to_fallback_text=reply_fallback,
136 reply_to=reply_to,
137 thread=thread,
138 link_previews=link_previews,
139 **mentions,
140 )
141 else:
142 log.debug("Ignoring %s", msg.get_id())
143 return
145 if isinstance(recipient, LegacyMUC):
146 stanza_id = await recipient.echo(msg, legacy_msg_id)
147 else:
148 stanza_id = None
149 self.__ack(msg)
151 if legacy_msg_id is None:
152 return
154 with self.xmpp.store.session() as orm:
155 if recipient.is_group:
156 self.xmpp.store.id_map.set_origin(
157 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id()
158 )
159 assert stanza_id is not None
160 self.xmpp.store.id_map.set_msg(
161 orm,
162 recipient.stored.id,
163 str(legacy_msg_id),
164 [stanza_id],
165 True,
166 )
167 else:
168 self.xmpp.store.id_map.set_msg(
169 orm,
170 recipient.stored.id,
171 str(legacy_msg_id),
172 [msg.get_id()],
173 False,
174 )
175 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]):
176 self.xmpp.store.id_map.set_thread(
177 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group
178 )
179 orm.commit()
181 @exceptions_to_xmpp_errors
182 async def on_message_correction(self, msg: Message) -> None:
183 if msg.get_plugin("retract", check=True) is not None:
184 # ignore message retraction fallback (fallback=last msg correction)
185 return
186 session, recipient, thread = await self._get_session_recipient_thread(msg)
187 legacy_id = self._xmpp_msg_id_to_legacy(
188 session, msg["replace"]["id"], recipient, True
189 )
191 if isinstance(recipient, LegacyMUC):
192 mentions = await recipient.parse_mentions(msg["body"])
193 else:
194 mentions = None
196 if previews := msg["link_previews"]:
197 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews]
198 else:
199 link_previews = []
201 if legacy_id is None:
202 log.debug("Did not find legacy ID to correct")
203 new_legacy_msg_id = await session.on_text(
204 recipient,
205 "Correction:" + msg["body"],
206 thread=thread,
207 mentions=mentions,
208 link_previews=link_previews,
209 )
210 elif not msg["body"].strip() and recipient.RETRACTION:
211 await session.on_retract(recipient, legacy_id, thread=thread)
212 new_legacy_msg_id = None
213 elif recipient.CORRECTION:
214 new_legacy_msg_id = await session.on_correct(
215 recipient,
216 msg["body"],
217 legacy_id,
218 thread=thread,
219 mentions=mentions,
220 link_previews=link_previews,
221 )
222 else:
223 session.send_gateway_message(
224 "Last message correction is not supported by this legacy service. "
225 "Slidge will send your correction as new message."
226 )
227 if recipient.RETRACTION and legacy_id is not None:
228 if legacy_id is not None:
229 session.send_gateway_message(
230 "Slidge will attempt to retract the original message you wanted"
231 " to edit."
232 )
233 await session.on_retract(recipient, legacy_id, thread=thread)
235 new_legacy_msg_id = await session.on_text(
236 recipient,
237 "Correction: " + msg["body"],
238 thread=thread,
239 mentions=mentions,
240 link_previews=link_previews,
241 )
243 if isinstance(recipient, LegacyMUC):
244 await recipient.echo(msg, new_legacy_msg_id)
245 else:
246 self.__ack(msg)
247 if new_legacy_msg_id is None:
248 return
249 with self.xmpp.store.session() as orm:
250 self.xmpp.store.id_map.set_msg(
251 orm,
252 recipient.stored.id,
253 new_legacy_msg_id,
254 [msg.get_id()],
255 recipient.is_group,
256 )
257 orm.commit()
259 @exceptions_to_xmpp_errors
260 async def on_message_retract(self, msg: Message):
261 session, recipient, thread = await self._get_session_recipient_thread(msg)
262 if not recipient.RETRACTION:
263 raise XMPPError(
264 "bad-request",
265 "This legacy service does not support message retraction.",
266 )
267 xmpp_id: str = msg["retract"]["id"]
268 legacy_id = self._xmpp_msg_id_to_legacy(
269 session, xmpp_id, recipient, origin=True
270 )
271 await session.on_retract(recipient, legacy_id, thread=thread)
272 if isinstance(recipient, LegacyMUC):
273 await recipient.echo(msg, None)
274 self.__ack(msg)
276 @exceptions_to_xmpp_errors
277 async def on_reactions(self, msg: Message):
278 session, recipient, thread = await self._get_session_recipient_thread(msg)
279 react_to: str = msg["reactions"]["id"]
281 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
282 session.SPECIAL_MSG_ID_PREFIX
283 )
285 if special_msg:
286 legacy_id = react_to
287 else:
288 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient)
290 if not legacy_id:
291 log.debug("Ignored reaction from user")
292 raise XMPPError(
293 "internal-server-error",
294 "Could not convert the XMPP msg ID to a legacy ID",
295 )
297 emojis = [
298 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
299 ]
300 error_msg = None
301 recipient = recipient
303 if not special_msg:
304 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
305 error_msg = "Maximum 1 emoji/message"
307 if not error_msg and (
308 subset := await recipient.available_emojis(legacy_id)
309 ):
310 if not set(emojis).issubset(subset):
311 error_msg = f"You can only react with the following emojis: {''.join(subset)}"
313 if error_msg:
314 session.send_gateway_message(error_msg)
315 if not isinstance(recipient, LegacyMUC):
316 # no need to carbon for groups, we just don't echo the stanza
317 recipient.react(legacy_id, carbon=True)
318 await session.on_react(recipient, legacy_id, [], thread=thread)
319 raise XMPPError(
320 "policy-violation",
321 text=error_msg,
322 clear=False,
323 )
325 await session.on_react(recipient, legacy_id, emojis, thread=thread)
326 if isinstance(recipient, LegacyMUC):
327 await recipient.echo(msg, None)
328 else:
329 self.__ack(msg)
331 with self.xmpp.store.session() as orm:
332 multi = self.xmpp.store.id_map.get_xmpp(
333 orm, recipient.stored.id, legacy_id, recipient.is_group
334 )
335 if not multi:
336 return
337 multi = [m for m in multi if react_to != m]
339 if isinstance(recipient, LegacyMUC):
340 for xmpp_id in multi:
341 mc = copy(msg)
342 mc["reactions"]["id"] = xmpp_id
343 await recipient.echo(mc)
344 elif isinstance(recipient, LegacyContact):
345 for xmpp_id in multi:
346 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
348 def __ack(self, msg: Message) -> None:
349 if not self.xmpp.PROPER_RECEIPTS:
350 self.xmpp.delivery_receipt.ack(msg)
352 async def __get_reply(
353 self, msg: Message, session: BaseSession, recipient: Recipient
354 ) -> tuple[
355 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None
356 ]:
357 try:
358 reply_to_msg_id = self._xmpp_msg_id_to_legacy(
359 session, msg["reply"]["id"], recipient
360 )
361 except XMPPError:
362 session.log.debug(
363 "Could not determine reply-to legacy msg ID, sending quote instead."
364 )
365 return redact_url(msg["body"]), None, None, None
367 reply_to_jid = JID(msg["reply"]["to"])
368 reply_to = None
369 if msg["type"] == "chat":
370 if reply_to_jid.bare != session.user_jid.bare:
371 try:
372 reply_to = await session.contacts.by_jid(reply_to_jid)
373 except XMPPError:
374 pass
375 elif msg["type"] == "groupchat":
376 nick = reply_to_jid.resource
377 try:
378 muc = await session.bookmarks.by_jid(reply_to_jid)
379 except XMPPError:
380 pass
381 else:
382 if nick == muc.user_nick:
383 reply_to = await muc.get_user_participant()
384 elif not nick:
385 reply_to = muc.get_system_participant()
386 else:
387 reply_to = await muc.get_participant(nick, store=False)
389 if msg.get_plugin("fallback", check=True) and (
390 isinstance(recipient, LegacyMUC) or recipient.REPLIES
391 ):
392 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
393 try:
394 reply_fallback = redact_url(msg["reply"].get_fallback_body())
395 except AttributeError:
396 reply_fallback = None
397 else:
398 text = msg["body"]
399 reply_fallback = None
401 return text, reply_to_msg_id, reply_to, reply_fallback
403 async def __send_url(
404 self, url: str, session: BaseSession, recipient: Recipient, **kwargs
405 ) -> int | str | None:
406 async with self.xmpp.http.get(url) as response:
407 if response.status >= 400:
408 session.log.warning(
409 "OOB url cannot be downloaded: %s, sending the URL as text"
410 " instead.",
411 response,
412 )
413 return await session.on_text(recipient, url, **kwargs)
415 return await session.on_file(
416 recipient, url, http_response=response, **kwargs
417 )
419 async def __send_bob(
420 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs
421 ) -> int | str | None:
422 with self.xmpp.store.session() as orm:
423 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
424 if sticker is None:
425 await self.xmpp.plugin["xep_0231"].get_bob(
426 from_, cid, ifrom=self.xmpp.boundjid
427 )
428 with self.xmpp.store.session() as orm:
429 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
430 assert sticker is not None
431 return await session.on_sticker(recipient, sticker, **kwargs)
434def redact_url(text: str) -> str:
435 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX
436 if not needle:
437 return text
438 return text.replace(needle, "")
441log = logging.getLogger(__name__)