Coverage for slidge / core / dispatcher / message / message.py: 81%
252 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import base64
2import logging
3from copy import copy
4from typing import TYPE_CHECKING, Any
5from xml.etree import ElementTree
7from slixmpp import JID, Message
8from slixmpp.exceptions import XMPPError
9from slixmpp.plugins.xep_0511.stanza import LinkMetadata
11from ....contact.contact import LegacyContact
12from ....group.participant import LegacyParticipant
13from ....group.room import LegacyMUC
14from ....util.types import AnyContact, AnyRecipient, AnySession, LinkPreview
15from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
16from ... import config
17from ..util import DispatcherMixin, exceptions_to_xmpp_errors
19if TYPE_CHECKING:
20 from slidge.core.gateway import BaseGateway
23class MessageContentMixin(DispatcherMixin):
24 __slots__: list[str] = []
26 def __init__(self, xmpp: "BaseGateway") -> None:
27 super().__init__(xmpp)
28 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
29 xmpp.add_event_handler("message_correction", self.on_message_correction)
30 xmpp.add_event_handler("message_retract", self.on_message_retract)
31 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
32 xmpp.add_event_handler("reactions", self.on_reactions)
34 async def on_groupchat_message(self, msg: Message) -> None:
35 await self.on_legacy_message(msg)
37 @exceptions_to_xmpp_errors
38 async def on_legacy_message(self, msg: Message) -> None:
39 """
40 Meant to be called from :class:`BaseGateway` only.
42 :param msg:
43 :return:
44 """
45 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
46 # present. this is a problem for MUC echoed messages
47 if msg.get_plugin("replace", check=True) is not None:
48 # ignore last message correction (handled by a specific method)
49 return
50 if msg.get_plugin("apply_to", check=True) is not None:
51 # ignore message retraction (handled by a specific method)
52 return
53 if msg.get_plugin("reactions", check=True) is not None:
54 # ignore message reaction fallback.
55 # the reaction itself is handled by self.react_from_msg().
56 return
57 if msg.get_plugin("retract", check=True) is not None:
58 # ignore message retraction fallback.
59 # the retraction itself is handled by self.on_retract
60 return
61 if msg.xml.find(".//{*}encrypted") is not None:
62 raise XMPPError(
63 "bad-request", "You cannot send encrypted messages through this gateway"
64 )
65 cid = None
66 if msg.get_plugin("html", check=True) is not None:
67 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
68 p = body.findall("p")
69 if p is not None and len(p) == 1:
70 if p[0].text is None or not p[0].text.strip():
71 images = p[0].findall("img")
72 if len(images) == 1:
73 # no text, single img ⇒ this is a sticker
74 # other cases should be interpreted as "custom emojis" in text
75 src = images[0].get("src")
76 if src is not None and src.startswith("cid:"):
77 cid = src.removeprefix("cid:")
79 session, recipient, thread = await self._get_session_recipient_thread(msg)
81 if msg.get_plugin("oob", check=True) is not None:
82 url = msg["oob"]["url"]
83 elif (
84 "reference" in msg
85 and "sims" in msg["reference"]
86 and "sources" in msg["reference"]["sims"]
87 ):
88 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
89 if source["uri"].startswith("http"):
90 url = source["uri"]
91 break
92 else:
93 url = None
94 else:
95 url = None
97 if msg.get_plugin("reply", check=True):
98 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply(
99 msg, session, recipient
100 )
101 else:
102 text = msg["body"]
103 reply_to_msg_id = None
104 reply_to = None
105 reply_fallback = None
107 if "link_metadata" in msg:
108 link_previews = parse_link_previews(msg["link_metadatas"])
109 else:
110 link_previews = []
112 if url:
113 legacy_msg_id = await self.__send_url(
114 url,
115 session,
116 recipient,
117 reply_to_msg_id=reply_to_msg_id,
118 reply_to_fallback_text=reply_fallback,
119 reply_to=reply_to,
120 thread=thread,
121 )
122 elif cid:
123 legacy_msg_id = await self.__send_bob(
124 msg.get_from(),
125 cid,
126 session,
127 recipient,
128 reply_to_msg_id=reply_to_msg_id,
129 reply_to_fallback_text=reply_fallback,
130 reply_to=reply_to,
131 thread=thread,
132 )
133 elif text:
134 if isinstance(recipient, LegacyMUC):
135 mentions = {"mentions": await recipient.parse_mentions(text)}
136 else:
137 mentions = {}
138 legacy_msg_id = await session.on_text(
139 recipient,
140 text,
141 reply_to_msg_id=reply_to_msg_id,
142 reply_to_fallback_text=reply_fallback,
143 reply_to=reply_to,
144 thread=thread,
145 link_previews=link_previews,
146 **mentions,
147 )
148 else:
149 log.debug("Ignoring %s", msg.get_id())
150 return
152 if isinstance(recipient, LegacyMUC):
153 stanza_id = await recipient.echo(msg, legacy_msg_id)
154 else:
155 stanza_id = None
156 self.__ack(msg)
158 if legacy_msg_id is None:
159 return
161 with self.xmpp.store.session() as orm:
162 if recipient.is_group:
163 self.xmpp.store.id_map.set_origin(
164 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id()
165 )
166 assert stanza_id is not None
167 self.xmpp.store.id_map.set_msg(
168 orm,
169 recipient.stored.id,
170 str(legacy_msg_id),
171 [stanza_id],
172 True,
173 )
174 else:
175 self.xmpp.store.id_map.set_msg(
176 orm,
177 recipient.stored.id,
178 str(legacy_msg_id),
179 [msg.get_id()],
180 False,
181 )
182 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]):
183 self.xmpp.store.id_map.set_thread(
184 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group
185 )
186 orm.commit()
188 @exceptions_to_xmpp_errors
189 async def on_message_correction(self, msg: Message) -> None:
190 if msg.get_plugin("retract", check=True) is not None:
191 # ignore message retraction fallback (fallback=last msg correction)
192 return
193 session, recipient, thread = await self._get_session_recipient_thread(msg)
194 legacy_id = self._xmpp_msg_id_to_legacy(
195 session, msg["replace"]["id"], recipient, True
196 )
198 if isinstance(recipient, LegacyMUC):
199 mentions = await recipient.parse_mentions(msg["body"])
200 else:
201 mentions = None
203 if "link_metadata" in msg:
204 link_previews = parse_link_previews(msg["link_metadatas"])
205 else:
206 link_previews = []
208 if legacy_id is None:
209 log.debug("Did not find legacy ID to correct")
210 new_legacy_msg_id = await session.on_text(
211 recipient,
212 "Correction:" + msg["body"],
213 thread=thread,
214 mentions=mentions,
215 link_previews=link_previews,
216 )
217 elif not msg["body"].strip() and recipient.RETRACTION:
218 await session.on_retract(recipient, legacy_id, thread=thread)
219 new_legacy_msg_id = None
220 elif recipient.CORRECTION:
221 new_legacy_msg_id = await session.on_correct(
222 recipient,
223 msg["body"],
224 legacy_id,
225 thread=thread,
226 mentions=mentions,
227 link_previews=link_previews,
228 )
229 else:
230 session.send_gateway_message(
231 "Last message correction is not supported by this legacy service. "
232 "Slidge will send your correction as new message."
233 )
234 if recipient.RETRACTION and legacy_id is not None:
235 if legacy_id is not None:
236 session.send_gateway_message(
237 "Slidge will attempt to retract the original message you wanted"
238 " to edit."
239 )
240 await session.on_retract(recipient, legacy_id, thread=thread)
242 new_legacy_msg_id = await session.on_text(
243 recipient,
244 "Correction: " + msg["body"],
245 thread=thread,
246 mentions=mentions,
247 link_previews=link_previews,
248 )
250 if isinstance(recipient, LegacyMUC):
251 await recipient.echo(msg, new_legacy_msg_id)
252 else:
253 self.__ack(msg)
254 if new_legacy_msg_id is None:
255 return
256 with self.xmpp.store.session() as orm:
257 self.xmpp.store.id_map.set_msg(
258 orm,
259 recipient.stored.id,
260 new_legacy_msg_id,
261 [msg.get_id()],
262 recipient.is_group,
263 )
264 orm.commit()
266 @exceptions_to_xmpp_errors
267 async def on_message_retract(self, msg: Message) -> None:
268 session, recipient, thread = await self._get_session_recipient_thread(msg)
269 if not recipient.RETRACTION:
270 raise XMPPError(
271 "bad-request",
272 "This legacy service does not support message retraction.",
273 )
274 xmpp_id: str = msg["retract"]["id"]
275 legacy_id = self._xmpp_msg_id_to_legacy(
276 session, xmpp_id, recipient, origin=True
277 )
278 await session.on_retract(recipient, legacy_id, thread=thread)
279 if isinstance(recipient, LegacyMUC):
280 await recipient.echo(msg, None)
281 self.__ack(msg)
283 @exceptions_to_xmpp_errors
284 async def on_reactions(self, msg: Message) -> None:
285 session, recipient, thread = await self._get_session_recipient_thread(msg)
286 react_to: str = msg["reactions"]["id"]
288 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
289 session.SPECIAL_MSG_ID_PREFIX
290 )
292 if special_msg:
293 legacy_id = react_to
294 else:
295 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient)
297 if not legacy_id:
298 log.debug("Ignored reaction from user")
299 raise XMPPError(
300 "internal-server-error",
301 "Could not convert the XMPP msg ID to a legacy ID",
302 )
304 emojis = [
305 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
306 ]
307 error_msg = None
308 recipient = recipient
310 if not special_msg:
311 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
312 error_msg = "Maximum 1 emoji/message"
314 if not error_msg and (
315 subset := await recipient.available_emojis(legacy_id)
316 ):
317 if not set(emojis).issubset(subset):
318 error_msg = f"You can only react with the following emojis: {''.join(subset)}"
320 if error_msg:
321 session.send_gateway_message(error_msg)
322 if not isinstance(recipient, LegacyMUC):
323 # no need to carbon for groups, we just don't echo the stanza
324 recipient.react(legacy_id, carbon=True)
325 await session.on_react(recipient, legacy_id, [], thread=thread)
326 raise XMPPError(
327 "policy-violation",
328 text=error_msg,
329 clear=False,
330 )
332 await session.on_react(recipient, legacy_id, emojis, thread=thread)
333 if isinstance(recipient, LegacyMUC):
334 await recipient.echo(msg, None)
335 else:
336 self.__ack(msg)
338 with self.xmpp.store.session() as orm:
339 multi = self.xmpp.store.id_map.get_xmpp(
340 orm, recipient.stored.id, legacy_id, recipient.is_group
341 )
342 if not multi:
343 return
344 multi = [m for m in multi if react_to != m]
346 if isinstance(recipient, LegacyMUC):
347 for xmpp_id in multi:
348 mc = copy(msg)
349 mc["reactions"]["id"] = xmpp_id
350 await recipient.echo(mc)
351 elif isinstance(recipient, LegacyContact):
352 for xmpp_id in multi:
353 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
355 def __ack(self, msg: Message) -> None:
356 if not self.xmpp.PROPER_RECEIPTS:
357 self.xmpp.delivery_receipt.ack(msg)
359 async def __get_reply(
360 self, msg: Message, session: AnySession, recipient: AnyRecipient
361 ) -> tuple[
362 str, str | int | None, AnyContact | LegacyParticipant | None, str | None
363 ]:
364 try:
365 reply_to_msg_id = self._xmpp_msg_id_to_legacy(
366 session, msg["reply"]["id"], recipient
367 )
368 except XMPPError:
369 session.log.debug(
370 "Could not determine reply-to legacy msg ID, sending quote instead."
371 )
372 return redact_url(msg["body"]), None, None, None
374 reply_to_jid = JID(msg["reply"]["to"])
375 reply_to = None
376 if msg["type"] == "chat":
377 if reply_to_jid.bare != session.user_jid.bare:
378 try:
379 reply_to = await session.contacts.by_jid(reply_to_jid)
380 except XMPPError:
381 pass
382 elif msg["type"] == "groupchat":
383 nick = reply_to_jid.resource
384 try:
385 muc = await session.bookmarks.by_jid(reply_to_jid)
386 except XMPPError:
387 pass
388 else:
389 if nick == muc.user_nick:
390 reply_to = await muc.get_user_participant()
391 elif not nick:
392 reply_to = muc.get_system_participant()
393 else:
394 reply_to = await muc.get_participant(nick, store=False)
396 if msg.get_plugin("fallback", check=True) and (
397 isinstance(recipient, LegacyMUC) or recipient.REPLIES
398 ):
399 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
400 try:
401 reply_fallback = redact_url(msg["reply"].get_fallback_body())
402 except AttributeError:
403 reply_fallback = None
404 else:
405 text = msg["body"]
406 reply_fallback = None
408 return text, reply_to_msg_id, reply_to, reply_fallback
410 async def __send_url(
411 self,
412 url: str,
413 session: AnySession,
414 recipient: AnyRecipient,
415 **kwargs: Any, # noqa:ANN401
416 ) -> int | str | None:
417 async with self.xmpp.http.get(url) as response:
418 if response.status >= 400:
419 session.log.warning(
420 "OOB url cannot be downloaded: %s, sending the URL as text"
421 " instead.",
422 response,
423 )
424 return await session.on_text(recipient, url, **kwargs)
426 return await session.on_file(
427 recipient, url, http_response=response, **kwargs
428 )
430 async def __send_bob(
431 self,
432 from_: JID,
433 cid: str,
434 session: AnySession,
435 recipient: AnyRecipient,
436 **kwargs: Any, # noqa:ANN401
437 ) -> int | str | None:
438 with self.xmpp.store.session() as orm:
439 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
440 if sticker is None:
441 await self.xmpp.plugin["xep_0231"].get_bob(
442 from_, cid, ifrom=self.xmpp.boundjid
443 )
444 with self.xmpp.store.session() as orm:
445 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
446 assert sticker is not None
447 return await session.on_sticker(recipient, sticker, **kwargs)
450def redact_url(text: str) -> str:
451 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX
452 if not needle:
453 return text
454 return text.replace(needle, "")
457def parse_link_previews(link_metadatas: list[LinkMetadata]) -> list[LinkPreview]:
458 result = []
459 for link_metadata in link_metadatas:
460 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type]
461 if (
462 preview.image
463 and isinstance(preview.image, str)
464 and preview.image.startswith("data:image/jpeg;base64,")
465 ):
466 try:
467 image = base64.b64decode(
468 preview.image.removeprefix("data:image/jpeg;base64,")
469 )
470 except Exception as e:
471 log.warning(
472 "Could not decode base64-encoded image: %s '%s'", e, preview.image
473 )
474 else:
475 preview = preview._replace(image=image)
476 result.append(preview)
477 return result
480log = logging.getLogger(__name__)