Coverage for slidge / core / dispatcher / message / message.py: 85%
270 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import base64
2import contextlib
3import hashlib
4import logging
5from copy import copy
6from dataclasses import dataclass
7from typing import TYPE_CHECKING
8from xml.etree import ElementTree
10from slixmpp import JID, Message
11from slixmpp.exceptions import XMPPError
12from slixmpp.plugins.xep_0511.stanza import LinkMetadata
14from ....contact.contact import LegacyContact
15from ....group.room import LegacyMUC
16from ....util.types import (
17 AnyRecipient,
18 LinkPreview,
19 RecipientType,
20 Reply,
21 XMPPAttachment,
22 XMPPMessage,
23)
24from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
25from ... import config
26from ..util import DispatcherMixin, exceptions_to_xmpp_errors
28if TYPE_CHECKING:
29 from slidge.util.types import AnyGateway
32@dataclass
33class _IncomingAttachment:
34 attachment: XMPPAttachment
35 is_sticker: bool
36 cid: str | None = None
39class MessageContentMixin(DispatcherMixin):
40 __slots__: list[str] = []
42 def __init__(self, xmpp: "AnyGateway") -> None:
43 super().__init__(xmpp)
44 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
45 xmpp.add_event_handler("message_retract", self.on_message_retract)
46 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
47 xmpp.add_event_handler("reactions", self.on_reactions)
49 async def on_groupchat_message(self, msg: Message) -> None:
50 await self.on_legacy_message(msg)
52 @exceptions_to_xmpp_errors
53 async def on_legacy_message(self, msg: Message) -> None:
54 """
55 Meant to be called from :class:`BaseGateway` only.
57 :param msg:
58 :return:
59 """
60 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
61 # present. this is a problem for MUC echoed messages
62 if "apply_to" in msg:
63 # ignore message retraction (handled by a specific method)
64 # this is an old version of the protocol that depends on the
65 # message fastening deprecated xep.
66 return
67 if "reactions" in msg:
68 # ignore message reaction fallback.
69 # the reaction itself is handled by self.react_from_msg().
70 return
71 if "retract" in msg:
72 # ignore message retraction fallback.
73 # the retraction itself is handled by self.on_retract
74 return
75 recipient, thread = await self._get_recipient_and_thread(msg)
76 replace = await self.__get_replace(msg, recipient)
77 if msg.xml.find(".//{*}encrypted") is not None:
78 raise XMPPError(
79 "bad-request", "You cannot send encrypted messages through this gateway"
80 )
81 body, reply = await self.__get_reply(msg, recipient)
82 cid = self.__get_xhtml_sticker_cid(msg)
84 if cid:
85 legacy_msg_id = await self.__dispatch_bob(
86 msg.get_from(),
87 cid,
88 recipient,
89 reply=reply,
90 thread=thread,
91 )
92 else:
93 attachments = self.__get_attachments(msg)
94 if len(attachments) == 1 and attachments[0].is_sticker:
95 legacy_msg_id = await self.__dispatch_nonbob_sticker(
96 attachments[0],
97 recipient,
98 msg["body"],
99 reply=reply,
100 thread=thread,
101 )
102 else:
103 legacy_msg_id = await self.__dispatch_msg(
104 replace=replace,
105 body=body,
106 attachments=tuple(a.attachment for a in attachments),
107 recipient=recipient,
108 thread=thread,
109 reply=reply,
110 msg=msg,
111 )
113 if isinstance(recipient, LegacyMUC):
114 stanza_id = await recipient.echo(msg, legacy_msg_id)
115 else:
116 stanza_id = None
117 self.__ack(msg)
119 if not legacy_msg_id:
120 return
122 with self.xmpp.store.session() as orm:
123 if recipient.is_group:
124 self.xmpp.store.id_map.set_origin(
125 orm, recipient.stored.id, legacy_msg_id, msg.get_id()
126 )
127 assert stanza_id is not None
128 self.xmpp.store.id_map.set_msg(
129 orm,
130 recipient.stored.id,
131 legacy_msg_id,
132 [stanza_id],
133 True,
134 )
135 else:
136 self.xmpp.store.id_map.set_msg(
137 orm,
138 recipient.stored.id,
139 legacy_msg_id,
140 [msg.get_id()],
141 False,
142 )
143 if recipient.session.MESSAGE_IDS_ARE_THREAD_IDS and thread:
144 self.xmpp.store.id_map.set_thread(
145 orm, recipient.stored.id, thread, legacy_msg_id, recipient.is_group
146 )
147 orm.commit()
149 def __get_xhtml_sticker_cid(self, msg: Message) -> str | None:
150 if "html" not in msg:
151 return None
152 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
153 p = body.findall("p")
154 if p is None:
155 return None
156 if len(p) != 1:
157 return None
158 if p[0].text is not None and p[0].text.strip():
159 return None
161 images = p[0].findall("img")
162 if len(images) != 1:
163 return None
164 # no text, single img ⇒ this is a sticker
165 # other cases should be interpreted as "custom emojis" in text
166 src = images[0].get("src")
167 if src is None:
168 return None
169 if src.startswith("cid:"):
170 return src.removeprefix("cid:")
171 return None
173 async def __get_replace(
174 self,
175 msg: Message,
176 recipient: RecipientType,
177 ) -> str | None:
178 if "replace" not in msg or "id" not in msg["replace"]:
179 return None
180 return (
181 self._xmpp_msg_id_to_legacy(msg["replace"]["id"], recipient, True) or None
182 )
184 def __get_attachments(self, msg: Message) -> list[_IncomingAttachment]:
185 is_sticker = "sticker" in msg
187 if (
188 "sfs" in msg
189 and "sources" in msg["sfs"]
190 and "url-data" in msg["sfs"]["sources"]
191 and "target" in msg["sfs"]["sources"]["url-data"]
192 ):
193 # TODO: support "attach source in later message", cf https://xmpp.org/extensions/xep-0447.html#example-5
194 # TODO: support for other sources than URL
195 # TODO: support for multiattachments in single message.
196 # What do we do if is_sticker and multiple files?
197 attachment = _IncomingAttachment(
198 XMPPAttachment(url=msg["sfs"]["sources"]["url-data"]["target"]),
199 is_sticker=is_sticker,
200 )
201 if "file" in msg["sfs"]:
202 attachment.attachment.content_type = (
203 msg["sfs"]["file"]["media-type"] or None
204 )
205 if "hash" in msg["sfs"]["file"]:
206 algo = msg["sfs"]["file"]["hash"]["algo"]
207 h = msg["sfs"]["file"]["hash"]["value"]
208 attachment.cid = f"{algo}+{h}" if algo and h else None
209 return [attachment]
211 if "oob" in msg:
212 return [
213 _IncomingAttachment(
214 XMPPAttachment(url=msg["oob"]["url"]), is_sticker=is_sticker
215 )
216 ]
218 if (
219 "reference" in msg
220 and "sims" in msg["reference"]
221 and "sources" in msg["reference"]["sims"]
222 ):
223 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
224 if source["uri"].startswith("http"):
225 attachment = _IncomingAttachment(
226 XMPPAttachment(url=source["uri"]), is_sticker=is_sticker
227 )
228 break
229 else:
230 return []
231 if "file" in msg["reference"]["sims"]:
232 attachment.attachment.content_type = msg["media-type"] or None
233 return [attachment]
235 return []
237 async def __dispatch_msg(
238 self,
239 *,
240 replace: str | None,
241 reply: Reply | None,
242 body: str,
243 attachments: tuple[XMPPAttachment, ...],
244 recipient: AnyRecipient,
245 thread: str | None,
246 msg: Message,
247 ) -> str | None:
248 if replace:
249 body = body.strip()
250 if not body and not attachments and recipient.RETRACTION:
251 await recipient.on_retract(replace, thread=thread)
252 return None
253 if not recipient.CORRECTION:
254 if recipient.RETRACTION:
255 await recipient.on_retract(replace, thread=thread)
256 replace = None
257 elif body:
258 body = "Correction:\n" + body
260 if not any([attachments, body]):
261 log.debug(
262 "Ignoring msg with id '%s' because no body or attachments found",
263 msg.get_id(),
264 )
265 return None
266 link_previews = (
267 parse_link_previews(msg["link_metadatas"]) if "link_metadata" in msg else ()
268 )
269 mentions = (
270 await recipient.parse_mentions(body)
271 if isinstance(recipient, LegacyMUC)
272 else ()
273 )
274 return await recipient.on_message(
275 XMPPMessage(
276 body=body,
277 attachments=attachments,
278 reply=reply,
279 thread=thread,
280 link_previews=link_previews,
281 mentions=mentions,
282 replace=replace,
283 )
284 )
286 @exceptions_to_xmpp_errors
287 async def on_message_retract(self, msg: Message) -> None:
288 recipient, thread = await self._get_recipient_and_thread(msg)
289 if not recipient.RETRACTION:
290 raise XMPPError(
291 "bad-request",
292 "This legacy service does not support message retraction.",
293 )
294 xmpp_id: str = msg["retract"]["id"]
295 legacy_id = self._xmpp_msg_id_to_legacy(xmpp_id, recipient, origin=True)
296 await recipient.on_retract(legacy_id, thread=thread)
297 if isinstance(recipient, LegacyMUC):
298 await recipient.echo(msg, None)
299 self.__ack(msg)
301 @exceptions_to_xmpp_errors
302 async def on_reactions(self, msg: Message) -> None:
303 recipient, thread = await self._get_recipient_and_thread(msg)
304 react_to: str = msg["reactions"]["id"]
306 special_msg = recipient.session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
307 recipient.session.SPECIAL_MSG_ID_PREFIX
308 )
310 if special_msg:
311 legacy_id = react_to
312 else:
313 legacy_id = self._xmpp_msg_id_to_legacy(react_to, recipient)
315 if not legacy_id:
316 log.debug("Ignored reaction from user")
317 raise XMPPError(
318 "internal-server-error",
319 "Could not convert the XMPP msg ID to a legacy ID",
320 )
322 emojis = [
323 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
324 ]
325 error_msg = None
326 recipient = recipient
328 if not special_msg:
329 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
330 error_msg = "Maximum 1 emoji/message"
332 if (
333 not error_msg
334 and (subset := await recipient.available_emojis(legacy_id))
335 and not set(emojis).issubset(subset)
336 ):
337 error_msg = (
338 f"You can only react with the following emojis: {''.join(subset)}"
339 )
341 if error_msg:
342 recipient.session.send_gateway_message(error_msg)
343 if not isinstance(recipient, LegacyMUC):
344 # no need to carbon for groups, we just don't echo the stanza
345 recipient.react(legacy_id, carbon=True)
346 await recipient.on_react(legacy_id, [], thread=thread)
347 raise XMPPError(
348 "policy-violation",
349 text=error_msg,
350 clear=False,
351 )
353 await recipient.on_react(legacy_id, emojis, thread=thread)
354 if isinstance(recipient, LegacyMUC):
355 await recipient.echo(msg, None)
356 else:
357 self.__ack(msg)
359 with self.xmpp.store.session() as orm:
360 multi = self.xmpp.store.id_map.get_xmpp(
361 orm, recipient.stored.id, legacy_id, recipient.is_group
362 )
363 if not multi:
364 return
365 multi = [m for m in multi if react_to != m]
367 if isinstance(recipient, LegacyMUC):
368 for xmpp_id in multi:
369 mc = copy(msg)
370 mc["reactions"]["id"] = xmpp_id
371 await recipient.echo(mc)
372 elif isinstance(recipient, LegacyContact):
373 for xmpp_id in multi:
374 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
376 def __ack(self, msg: Message) -> None:
377 if not self.xmpp.PROPER_RECEIPTS:
378 self.xmpp.delivery_receipt.ack(msg)
380 async def __get_reply(
381 self, msg: Message, recipient: AnyRecipient
382 ) -> tuple[str, Reply | None]:
383 if "reply" not in msg:
384 return msg["body"], None
386 session = recipient.session
388 try:
389 reply_to_msg_id = self._xmpp_msg_id_to_legacy(msg["reply"]["id"], recipient)
390 except XMPPError:
391 session.log.debug(
392 "Could not determine reply-to legacy msg ID, sending quote instead."
393 )
394 return redact_url(msg["body"]), None
396 reply_to_jid = JID(msg["reply"]["to"])
397 reply_to = None
398 if msg["type"] == "chat":
399 if reply_to_jid.bare != session.user_jid.bare:
400 with contextlib.suppress(XMPPError):
401 reply_to = await session.contacts.by_jid(reply_to_jid)
402 elif msg["type"] == "groupchat":
403 nick = reply_to_jid.resource
404 try:
405 muc = await session.bookmarks.by_jid(reply_to_jid)
406 except XMPPError:
407 pass
408 else:
409 if nick == muc.user_nick:
410 reply_to = await muc.get_user_participant()
411 elif not nick:
412 reply_to = muc.get_system_participant()
413 else:
414 reply_to = await muc.get_participant(nick, store=False)
416 if "fallback" in msg and (
417 isinstance(recipient, LegacyMUC) or recipient.REPLIES
418 ):
419 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
420 try:
421 reply_fallback = redact_url(msg["reply"].get_fallback_body())
422 except AttributeError:
423 reply_fallback = None
424 else:
425 text = msg["body"]
426 reply_fallback = None
428 return text, Reply(reply_to_msg_id, reply_to, reply_fallback)
430 async def __dispatch_nonbob_sticker(
431 self,
432 attachment: _IncomingAttachment,
433 recipient: AnyRecipient,
434 fallback: str,
435 reply: Reply | None = None,
436 thread: str | None = None,
437 ) -> str | None:
438 if attachment.cid:
439 with self.xmpp.store.session() as orm:
440 sticker = self.xmpp.store.bob.get_sticker(orm, attachment.cid)
441 else:
442 sticker = None
443 if sticker is None:
444 async with attachment.attachment.get() as response:
445 response.raise_for_status()
446 data = await response.read()
447 cid = "sha256+" + hashlib.sha256(data).hexdigest()
448 with self.xmpp.store.session() as orm:
449 sticker = self.xmpp.store.bob.set_sticker(
450 orm, cid, data, attachment.attachment.content_type
451 )
452 orm.commit()
453 if sticker is None:
454 return await recipient.on_message(
455 XMPPMessage(
456 body="\n".join([attachment.attachment.url, fallback]),
457 thread=thread,
458 reply=reply,
459 )
460 )
461 sticker.reply = reply
462 sticker.thread = thread
463 return await recipient.on_sticker(sticker)
465 async def __dispatch_bob(
466 self,
467 from_: JID,
468 cid: str,
469 recipient: AnyRecipient,
470 reply: Reply | None = None,
471 thread: str | None = None,
472 ) -> str | None:
473 with self.xmpp.store.session() as orm:
474 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
475 if sticker is None:
476 await self.xmpp.plugin["xep_0231"].get_bob(
477 from_, cid, ifrom=self.xmpp.boundjid
478 )
479 with self.xmpp.store.session() as orm:
480 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
481 assert sticker is not None
482 sticker.reply = reply
483 sticker.thread = thread
484 return await recipient.on_sticker(sticker)
487def redact_url(text: str) -> str:
488 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX
489 if not needle:
490 return text
491 return text.replace(needle, "")
494def parse_link_previews(link_metadatas: list[LinkMetadata]) -> tuple[LinkPreview, ...]:
495 result = []
496 for link_metadata in link_metadatas:
497 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type]
498 if (
499 preview.image
500 and isinstance(preview.image, str)
501 and preview.image.startswith("data:image/jpeg;base64,")
502 ):
503 try:
504 image = base64.b64decode(
505 preview.image.removeprefix("data:image/jpeg;base64,")
506 )
507 except Exception as e:
508 log.warning(
509 "Could not decode base64-encoded image: %s '%s'", e, preview.image
510 )
511 else:
512 preview = preview._replace(image=image)
513 result.append(preview)
514 return tuple(result)
517log = logging.getLogger(__name__)