Coverage for slidge / core / dispatcher / message / message.py: 84%
278 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import base64
2import contextlib
3import hashlib
4import logging
5from copy import copy
6from typing import TYPE_CHECKING, Any
7from xml.etree import ElementTree
9from aiohttp import ClientResponse
10from slixmpp import JID, Message
11from slixmpp.exceptions import XMPPError
12from slixmpp.plugins.xep_0511.stanza import LinkMetadata
14from ....contact.contact import LegacyContact
15from ....group.participant import LegacyParticipant
16from ....group.room import LegacyMUC
17from ....util.types import AnyContact, AnyRecipient, AnySession, LinkPreview
18from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
19from ... import config
20from ..util import DispatcherMixin, exceptions_to_xmpp_errors
22if TYPE_CHECKING:
23 from slidge.util.types import AnyGateway
26class MessageContentMixin(DispatcherMixin):
27 __slots__: list[str] = []
29 def __init__(self, xmpp: "AnyGateway") -> None:
30 super().__init__(xmpp)
31 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
32 xmpp.add_event_handler("message_correction", self.on_message_correction)
33 xmpp.add_event_handler("message_retract", self.on_message_retract)
34 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
35 xmpp.add_event_handler("reactions", self.on_reactions)
37 async def on_groupchat_message(self, msg: Message) -> None:
38 await self.on_legacy_message(msg)
40 @exceptions_to_xmpp_errors
41 async def on_legacy_message(self, msg: Message) -> None:
42 """
43 Meant to be called from :class:`BaseGateway` only.
45 :param msg:
46 :return:
47 """
48 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
49 # present. this is a problem for MUC echoed messages
50 if "replace" in msg:
51 # ignore last message correction (handled by a specific method)
52 return
53 if "apply_to" in msg:
54 # ignore message retraction (handled by a specific method)
55 return
56 if "reactions" in msg:
57 # ignore message reaction fallback.
58 # the reaction itself is handled by self.react_from_msg().
59 return
60 if "retract" in msg:
61 # ignore message retraction fallback.
62 # the retraction itself is handled by self.on_retract
63 return
64 if msg.xml.find(".//{*}encrypted") is not None:
65 raise XMPPError(
66 "bad-request", "You cannot send encrypted messages through this gateway"
67 )
68 cid = None
69 if "html" in msg:
70 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
71 p = body.findall("p")
72 if (
73 p is not None
74 and len(p) == 1
75 and (p[0].text is None or not p[0].text.strip())
76 ):
77 images = p[0].findall("img")
78 if len(images) == 1:
79 # no text, single img ⇒ this is a sticker
80 # other cases should be interpreted as "custom emojis" in text
81 src = images[0].get("src")
82 if src is not None and src.startswith("cid:"):
83 cid = src.removeprefix("cid:")
85 session, recipient, thread = await self._get_session_recipient_thread(msg)
87 if (
88 "sfs" in msg
89 and "sources" in msg["sfs"]
90 and "url-data" in msg["sfs"]["sources"]
91 and "target" in msg["sfs"]["sources"]["url-data"]
92 ):
93 # TODO: support "attach source in later message", cf https://xmpp.org/extensions/xep-0447.html#example-5
94 # TODO: support for other sources than URL
95 url = msg["sfs"]["sources"]["url-data"]["target"]
96 if "file" in msg["sfs"]:
97 content_type = msg["sfs"]["file"]["media-type"] or None
98 if "hash" in msg["sfs"]["file"]:
99 algo = msg["sfs"]["file"]["hash"]["algo"]
100 h = msg["sfs"]["file"]["hash"]["value"]
101 cid = f"{algo}+{h}" if algo and h else None
102 elif "oob" in msg:
103 url = msg["oob"]["url"]
104 content_type = None
105 elif (
106 "reference" in msg
107 and "sims" in msg["reference"]
108 and "sources" in msg["reference"]["sims"]
109 ):
110 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
111 if source["uri"].startswith("http"):
112 url = source["uri"]
113 break
114 else:
115 url = None
116 if "file" in msg["reference"]["sims"]:
117 content_type = msg["media-type"] or None
118 else:
119 url = None
120 content_type = None
122 if "reply" in msg:
123 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply(
124 msg, session, recipient
125 )
126 else:
127 text = msg["body"]
128 reply_to_msg_id = None
129 reply_to = None
130 reply_fallback = None
132 if "link_metadata" in msg:
133 link_previews = parse_link_previews(msg["link_metadatas"])
134 else:
135 link_previews = []
137 if url:
138 legacy_msg_id = await self.__send_url(
139 url,
140 session,
141 recipient,
142 fallback=msg["body"],
143 cid=cid,
144 is_sticker="sticker" in msg,
145 content_type=content_type,
146 reply_to_msg_id=reply_to_msg_id,
147 reply_to_fallback_text=reply_fallback,
148 reply_to=reply_to,
149 thread=thread,
150 )
151 elif cid:
152 legacy_msg_id = await self.__send_bob(
153 msg.get_from(),
154 cid,
155 session,
156 recipient,
157 reply_to_msg_id=reply_to_msg_id,
158 reply_to_fallback_text=reply_fallback,
159 reply_to=reply_to,
160 thread=thread,
161 )
162 elif text:
163 if isinstance(recipient, LegacyMUC):
164 mentions = {"mentions": await recipient.parse_mentions(text)}
165 else:
166 mentions = {}
167 legacy_msg_id = await session.on_text(
168 recipient,
169 text,
170 reply_to_msg_id=reply_to_msg_id,
171 reply_to_fallback_text=reply_fallback,
172 reply_to=reply_to,
173 thread=thread,
174 link_previews=link_previews,
175 **mentions,
176 )
177 else:
178 log.debug("Ignoring %s", msg.get_id())
179 return
181 if isinstance(recipient, LegacyMUC):
182 stanza_id = await recipient.echo(msg, legacy_msg_id)
183 else:
184 stanza_id = None
185 self.__ack(msg)
187 if legacy_msg_id is None:
188 return
190 with self.xmpp.store.session() as orm:
191 if recipient.is_group:
192 self.xmpp.store.id_map.set_origin(
193 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id()
194 )
195 assert stanza_id is not None
196 self.xmpp.store.id_map.set_msg(
197 orm,
198 recipient.stored.id,
199 str(legacy_msg_id),
200 [stanza_id],
201 True,
202 )
203 else:
204 self.xmpp.store.id_map.set_msg(
205 orm,
206 recipient.stored.id,
207 str(legacy_msg_id),
208 [msg.get_id()],
209 False,
210 )
211 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]):
212 self.xmpp.store.id_map.set_thread(
213 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group
214 )
215 orm.commit()
217 @exceptions_to_xmpp_errors
218 async def on_message_correction(self, msg: Message) -> None:
219 if "retract" in msg:
220 # ignore message retraction fallback (fallback=last msg correction)
221 return
222 session, recipient, thread = await self._get_session_recipient_thread(msg)
223 legacy_id = self._xmpp_msg_id_to_legacy(
224 session, msg["replace"]["id"], recipient, True
225 )
227 if isinstance(recipient, LegacyMUC):
228 mentions = await recipient.parse_mentions(msg["body"])
229 else:
230 mentions = None
232 if "link_metadata" in msg:
233 link_previews = parse_link_previews(msg["link_metadatas"])
234 else:
235 link_previews = []
237 if legacy_id is None:
238 log.debug("Did not find legacy ID to correct")
239 new_legacy_msg_id = await session.on_text(
240 recipient,
241 "Correction:" + msg["body"],
242 thread=thread,
243 mentions=mentions,
244 link_previews=link_previews,
245 )
246 elif not msg["body"].strip() and recipient.RETRACTION:
247 await session.on_retract(recipient, legacy_id, thread=thread)
248 new_legacy_msg_id = None
249 elif recipient.CORRECTION:
250 new_legacy_msg_id = await session.on_correct(
251 recipient,
252 msg["body"],
253 legacy_id,
254 thread=thread,
255 mentions=mentions,
256 link_previews=link_previews,
257 )
258 else:
259 session.send_gateway_message(
260 "Last message correction is not supported by this legacy service. "
261 "Slidge will send your correction as new message."
262 )
263 if recipient.RETRACTION and legacy_id is not None:
264 session.send_gateway_message(
265 "Slidge will attempt to retract the original message you wanted"
266 " to edit."
267 )
268 await session.on_retract(recipient, legacy_id, thread=thread)
270 new_legacy_msg_id = await session.on_text(
271 recipient,
272 "Correction: " + msg["body"],
273 thread=thread,
274 mentions=mentions,
275 link_previews=link_previews,
276 )
278 if isinstance(recipient, LegacyMUC):
279 await recipient.echo(msg, new_legacy_msg_id)
280 else:
281 self.__ack(msg)
282 if new_legacy_msg_id is None:
283 return
284 with self.xmpp.store.session() as orm:
285 self.xmpp.store.id_map.set_msg(
286 orm,
287 recipient.stored.id,
288 new_legacy_msg_id,
289 [msg.get_id()],
290 recipient.is_group,
291 )
292 orm.commit()
294 @exceptions_to_xmpp_errors
295 async def on_message_retract(self, msg: Message) -> None:
296 session, recipient, thread = await self._get_session_recipient_thread(msg)
297 if not recipient.RETRACTION:
298 raise XMPPError(
299 "bad-request",
300 "This legacy service does not support message retraction.",
301 )
302 xmpp_id: str = msg["retract"]["id"]
303 legacy_id = self._xmpp_msg_id_to_legacy(
304 session, xmpp_id, recipient, origin=True
305 )
306 await session.on_retract(recipient, legacy_id, thread=thread)
307 if isinstance(recipient, LegacyMUC):
308 await recipient.echo(msg, None)
309 self.__ack(msg)
311 @exceptions_to_xmpp_errors
312 async def on_reactions(self, msg: Message) -> None:
313 session, recipient, thread = await self._get_session_recipient_thread(msg)
314 react_to: str = msg["reactions"]["id"]
316 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
317 session.SPECIAL_MSG_ID_PREFIX
318 )
320 if special_msg:
321 legacy_id = react_to
322 else:
323 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient)
325 if not legacy_id:
326 log.debug("Ignored reaction from user")
327 raise XMPPError(
328 "internal-server-error",
329 "Could not convert the XMPP msg ID to a legacy ID",
330 )
332 emojis = [
333 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
334 ]
335 error_msg = None
336 recipient = recipient
338 if not special_msg:
339 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
340 error_msg = "Maximum 1 emoji/message"
342 if (
343 not error_msg
344 and (subset := await recipient.available_emojis(legacy_id))
345 and not set(emojis).issubset(subset)
346 ):
347 error_msg = (
348 f"You can only react with the following emojis: {''.join(subset)}"
349 )
351 if error_msg:
352 session.send_gateway_message(error_msg)
353 if not isinstance(recipient, LegacyMUC):
354 # no need to carbon for groups, we just don't echo the stanza
355 recipient.react(legacy_id, carbon=True)
356 await session.on_react(recipient, legacy_id, [], thread=thread)
357 raise XMPPError(
358 "policy-violation",
359 text=error_msg,
360 clear=False,
361 )
363 await session.on_react(recipient, legacy_id, emojis, thread=thread)
364 if isinstance(recipient, LegacyMUC):
365 await recipient.echo(msg, None)
366 else:
367 self.__ack(msg)
369 with self.xmpp.store.session() as orm:
370 multi = self.xmpp.store.id_map.get_xmpp(
371 orm, recipient.stored.id, legacy_id, recipient.is_group
372 )
373 if not multi:
374 return
375 multi = [m for m in multi if react_to != m]
377 if isinstance(recipient, LegacyMUC):
378 for xmpp_id in multi:
379 mc = copy(msg)
380 mc["reactions"]["id"] = xmpp_id
381 await recipient.echo(mc)
382 elif isinstance(recipient, LegacyContact):
383 for xmpp_id in multi:
384 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
386 def __ack(self, msg: Message) -> None:
387 if not self.xmpp.PROPER_RECEIPTS:
388 self.xmpp.delivery_receipt.ack(msg)
390 async def __get_reply(
391 self, msg: Message, session: AnySession, recipient: AnyRecipient
392 ) -> tuple[
393 str, str | int | None, AnyContact | LegacyParticipant | None, str | None
394 ]:
395 try:
396 reply_to_msg_id = self._xmpp_msg_id_to_legacy(
397 session, msg["reply"]["id"], recipient
398 )
399 except XMPPError:
400 session.log.debug(
401 "Could not determine reply-to legacy msg ID, sending quote instead."
402 )
403 return redact_url(msg["body"]), None, None, None
405 reply_to_jid = JID(msg["reply"]["to"])
406 reply_to = None
407 if msg["type"] == "chat":
408 if reply_to_jid.bare != session.user_jid.bare:
409 with contextlib.suppress(XMPPError):
410 reply_to = await session.contacts.by_jid(reply_to_jid)
411 elif msg["type"] == "groupchat":
412 nick = reply_to_jid.resource
413 try:
414 muc = await session.bookmarks.by_jid(reply_to_jid)
415 except XMPPError:
416 pass
417 else:
418 if nick == muc.user_nick:
419 reply_to = await muc.get_user_participant()
420 elif not nick:
421 reply_to = muc.get_system_participant()
422 else:
423 reply_to = await muc.get_participant(nick, store=False)
425 if "fallback" in msg and (
426 isinstance(recipient, LegacyMUC) or recipient.REPLIES
427 ):
428 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
429 try:
430 reply_fallback = redact_url(msg["reply"].get_fallback_body())
431 except AttributeError:
432 reply_fallback = None
433 else:
434 text = msg["body"]
435 reply_fallback = None
437 return text, reply_to_msg_id, reply_to, reply_fallback
439 async def __send_url(
440 self,
441 url: str,
442 session: AnySession,
443 recipient: AnyRecipient,
444 is_sticker: bool,
445 cid: str | None,
446 content_type: str | None,
447 fallback: str,
448 **kwargs: Any, # noqa:ANN401
449 ) -> int | str | None:
450 async with self.xmpp.http.get(url) as response:
451 if response.status >= 400:
452 session.log.warning(
453 "OOB url cannot be downloaded: %s, sending the URL as text"
454 " instead.",
455 response,
456 )
457 return await session.on_text(
458 recipient, "\n".join([url, fallback]), **kwargs
459 )
460 if is_sticker:
461 return await self.__send_nonbob_sticker(
462 response,
463 url,
464 session,
465 recipient,
466 cid,
467 content_type,
468 fallback,
469 **kwargs,
470 )
471 return await session.on_file(
472 recipient, url, http_response=response, **kwargs
473 )
475 async def __send_nonbob_sticker(
476 self,
477 response: ClientResponse,
478 url: str,
479 session: AnySession,
480 recipient: AnyRecipient,
481 cid: str | None,
482 content_type: str | None,
483 fallback: str,
484 **kwargs: Any, # noqa:ANN401
485 ) -> int | str | None:
486 if cid:
487 with self.xmpp.store.session() as orm:
488 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
489 else:
490 sticker = None
491 if sticker is None:
492 data = await response.read()
493 cid = "sha256+" + hashlib.sha256(data).hexdigest()
494 with self.xmpp.store.session() as orm:
495 sticker = self.xmpp.store.bob.set_sticker(orm, cid, data, content_type)
496 orm.commit()
497 if sticker is None:
498 return await session.on_text(
499 recipient, "\n".join([url, fallback]), **kwargs
500 )
501 return await session.on_sticker(recipient, sticker, **kwargs)
503 async def __send_bob(
504 self,
505 from_: JID,
506 cid: str,
507 session: AnySession,
508 recipient: AnyRecipient,
509 **kwargs: Any, # noqa:ANN401
510 ) -> int | str | None:
511 with self.xmpp.store.session() as orm:
512 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
513 if sticker is None:
514 await self.xmpp.plugin["xep_0231"].get_bob(
515 from_, cid, ifrom=self.xmpp.boundjid
516 )
517 with self.xmpp.store.session() as orm:
518 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
519 assert sticker is not None
520 return await session.on_sticker(recipient, sticker, **kwargs)
523def redact_url(text: str) -> str:
524 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX
525 if not needle:
526 return text
527 return text.replace(needle, "")
530def parse_link_previews(link_metadatas: list[LinkMetadata]) -> list[LinkPreview]:
531 result = []
532 for link_metadata in link_metadatas:
533 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type]
534 if (
535 preview.image
536 and isinstance(preview.image, str)
537 and preview.image.startswith("data:image/jpeg;base64,")
538 ):
539 try:
540 image = base64.b64decode(
541 preview.image.removeprefix("data:image/jpeg;base64,")
542 )
543 except Exception as e:
544 log.warning(
545 "Could not decode base64-encoded image: %s '%s'", e, preview.image
546 )
547 else:
548 preview = preview._replace(image=image)
549 result.append(preview)
550 return result
553log = logging.getLogger(__name__)