Coverage for slidge / core / dispatcher / message / message.py: 81%
252 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
1import base64
2import logging
3from copy import copy
4from xml.etree import ElementTree
6from slixmpp import JID, Message
7from slixmpp.exceptions import XMPPError
8from slixmpp.plugins.xep_0511.stanza import LinkMetadata
10from ....contact.contact import LegacyContact
11from ....group.participant import LegacyParticipant
12from ....group.room import LegacyMUC
13from ....util.types import LinkPreview, Recipient
14from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
15from ... import config
16from ...session import BaseSession
17from ..util import DispatcherMixin, exceptions_to_xmpp_errors
20class MessageContentMixin(DispatcherMixin):
21 __slots__: list[str] = []
23 def __init__(self, xmpp) -> None:
24 super().__init__(xmpp)
25 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
26 xmpp.add_event_handler("message_correction", self.on_message_correction)
27 xmpp.add_event_handler("message_retract", self.on_message_retract)
28 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
29 xmpp.add_event_handler("reactions", self.on_reactions)
31 async def on_groupchat_message(self, msg: Message) -> None:
32 await self.on_legacy_message(msg)
34 @exceptions_to_xmpp_errors
35 async def on_legacy_message(self, msg: Message) -> None:
36 """
37 Meant to be called from :class:`BaseGateway` only.
39 :param msg:
40 :return:
41 """
42 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
43 # present. this is a problem for MUC echoed messages
44 if msg.get_plugin("replace", check=True) is not None:
45 # ignore last message correction (handled by a specific method)
46 return
47 if msg.get_plugin("apply_to", check=True) is not None:
48 # ignore message retraction (handled by a specific method)
49 return
50 if msg.get_plugin("reactions", check=True) is not None:
51 # ignore message reaction fallback.
52 # the reaction itself is handled by self.react_from_msg().
53 return
54 if msg.get_plugin("retract", check=True) is not None:
55 # ignore message retraction fallback.
56 # the retraction itself is handled by self.on_retract
57 return
58 if msg.xml.find(".//{*}encrypted") is not None:
59 raise XMPPError(
60 "bad-request", "You cannot send encrypted messages through this gateway"
61 )
62 cid = None
63 if msg.get_plugin("html", check=True) is not None:
64 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
65 p = body.findall("p")
66 if p is not None and len(p) == 1:
67 if p[0].text is None or not p[0].text.strip():
68 images = p[0].findall("img")
69 if len(images) == 1:
70 # no text, single img ⇒ this is a sticker
71 # other cases should be interpreted as "custom emojis" in text
72 src = images[0].get("src")
73 if src is not None and src.startswith("cid:"):
74 cid = src.removeprefix("cid:")
76 session, recipient, thread = await self._get_session_recipient_thread(msg)
78 if msg.get_plugin("oob", check=True) is not None:
79 url = msg["oob"]["url"]
80 elif (
81 "reference" in msg
82 and "sims" in msg["reference"]
83 and "sources" in msg["reference"]["sims"]
84 ):
85 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
86 if source["uri"].startswith("http"):
87 url = source["uri"]
88 break
89 else:
90 url = None
91 else:
92 url = None
94 if msg.get_plugin("reply", check=True):
95 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply(
96 msg, session, recipient
97 )
98 else:
99 text = msg["body"]
100 reply_to_msg_id = None
101 reply_to = None
102 reply_fallback = None
104 if "link_metadata" in msg:
105 link_previews = parse_link_previews(msg["link_metadatas"])
106 else:
107 link_previews = []
109 if url:
110 legacy_msg_id = await self.__send_url(
111 url,
112 session,
113 recipient,
114 reply_to_msg_id=reply_to_msg_id,
115 reply_to_fallback_text=reply_fallback,
116 reply_to=reply_to,
117 thread=thread,
118 )
119 elif cid:
120 legacy_msg_id = await self.__send_bob(
121 msg.get_from(),
122 cid,
123 session,
124 recipient,
125 reply_to_msg_id=reply_to_msg_id,
126 reply_to_fallback_text=reply_fallback,
127 reply_to=reply_to,
128 thread=thread,
129 )
130 elif text:
131 if isinstance(recipient, LegacyMUC):
132 mentions = {"mentions": await recipient.parse_mentions(text)}
133 else:
134 mentions = {}
135 legacy_msg_id = await session.on_text(
136 recipient,
137 text,
138 reply_to_msg_id=reply_to_msg_id,
139 reply_to_fallback_text=reply_fallback,
140 reply_to=reply_to,
141 thread=thread,
142 link_previews=link_previews,
143 **mentions,
144 )
145 else:
146 log.debug("Ignoring %s", msg.get_id())
147 return
149 if isinstance(recipient, LegacyMUC):
150 stanza_id = await recipient.echo(msg, legacy_msg_id)
151 else:
152 stanza_id = None
153 self.__ack(msg)
155 if legacy_msg_id is None:
156 return
158 with self.xmpp.store.session() as orm:
159 if recipient.is_group:
160 self.xmpp.store.id_map.set_origin(
161 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id()
162 )
163 assert stanza_id is not None
164 self.xmpp.store.id_map.set_msg(
165 orm,
166 recipient.stored.id,
167 str(legacy_msg_id),
168 [stanza_id],
169 True,
170 )
171 else:
172 self.xmpp.store.id_map.set_msg(
173 orm,
174 recipient.stored.id,
175 str(legacy_msg_id),
176 [msg.get_id()],
177 False,
178 )
179 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]):
180 self.xmpp.store.id_map.set_thread(
181 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group
182 )
183 orm.commit()
185 @exceptions_to_xmpp_errors
186 async def on_message_correction(self, msg: Message) -> None:
187 if msg.get_plugin("retract", check=True) is not None:
188 # ignore message retraction fallback (fallback=last msg correction)
189 return
190 session, recipient, thread = await self._get_session_recipient_thread(msg)
191 legacy_id = self._xmpp_msg_id_to_legacy(
192 session, msg["replace"]["id"], recipient, True
193 )
195 if isinstance(recipient, LegacyMUC):
196 mentions = await recipient.parse_mentions(msg["body"])
197 else:
198 mentions = None
200 if "link_metadata" in msg:
201 link_previews = parse_link_previews(msg["link_metadatas"])
202 else:
203 link_previews = []
205 if legacy_id is None:
206 log.debug("Did not find legacy ID to correct")
207 new_legacy_msg_id = await session.on_text(
208 recipient,
209 "Correction:" + msg["body"],
210 thread=thread,
211 mentions=mentions,
212 link_previews=link_previews,
213 )
214 elif not msg["body"].strip() and recipient.RETRACTION:
215 await session.on_retract(recipient, legacy_id, thread=thread)
216 new_legacy_msg_id = None
217 elif recipient.CORRECTION:
218 new_legacy_msg_id = await session.on_correct(
219 recipient,
220 msg["body"],
221 legacy_id,
222 thread=thread,
223 mentions=mentions,
224 link_previews=link_previews,
225 )
226 else:
227 session.send_gateway_message(
228 "Last message correction is not supported by this legacy service. "
229 "Slidge will send your correction as new message."
230 )
231 if recipient.RETRACTION and legacy_id is not None:
232 if legacy_id is not None:
233 session.send_gateway_message(
234 "Slidge will attempt to retract the original message you wanted"
235 " to edit."
236 )
237 await session.on_retract(recipient, legacy_id, thread=thread)
239 new_legacy_msg_id = await session.on_text(
240 recipient,
241 "Correction: " + msg["body"],
242 thread=thread,
243 mentions=mentions,
244 link_previews=link_previews,
245 )
247 if isinstance(recipient, LegacyMUC):
248 await recipient.echo(msg, new_legacy_msg_id)
249 else:
250 self.__ack(msg)
251 if new_legacy_msg_id is None:
252 return
253 with self.xmpp.store.session() as orm:
254 self.xmpp.store.id_map.set_msg(
255 orm,
256 recipient.stored.id,
257 new_legacy_msg_id,
258 [msg.get_id()],
259 recipient.is_group,
260 )
261 orm.commit()
263 @exceptions_to_xmpp_errors
264 async def on_message_retract(self, msg: Message) -> None:
265 session, recipient, thread = await self._get_session_recipient_thread(msg)
266 if not recipient.RETRACTION:
267 raise XMPPError(
268 "bad-request",
269 "This legacy service does not support message retraction.",
270 )
271 xmpp_id: str = msg["retract"]["id"]
272 legacy_id = self._xmpp_msg_id_to_legacy(
273 session, xmpp_id, recipient, origin=True
274 )
275 await session.on_retract(recipient, legacy_id, thread=thread)
276 if isinstance(recipient, LegacyMUC):
277 await recipient.echo(msg, None)
278 self.__ack(msg)
280 @exceptions_to_xmpp_errors
281 async def on_reactions(self, msg: Message) -> None:
282 session, recipient, thread = await self._get_session_recipient_thread(msg)
283 react_to: str = msg["reactions"]["id"]
285 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
286 session.SPECIAL_MSG_ID_PREFIX
287 )
289 if special_msg:
290 legacy_id = react_to
291 else:
292 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient)
294 if not legacy_id:
295 log.debug("Ignored reaction from user")
296 raise XMPPError(
297 "internal-server-error",
298 "Could not convert the XMPP msg ID to a legacy ID",
299 )
301 emojis = [
302 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
303 ]
304 error_msg = None
305 recipient = recipient
307 if not special_msg:
308 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
309 error_msg = "Maximum 1 emoji/message"
311 if not error_msg and (
312 subset := await recipient.available_emojis(legacy_id)
313 ):
314 if not set(emojis).issubset(subset):
315 error_msg = f"You can only react with the following emojis: {''.join(subset)}"
317 if error_msg:
318 session.send_gateway_message(error_msg)
319 if not isinstance(recipient, LegacyMUC):
320 # no need to carbon for groups, we just don't echo the stanza
321 recipient.react(legacy_id, carbon=True)
322 await session.on_react(recipient, legacy_id, [], thread=thread)
323 raise XMPPError(
324 "policy-violation",
325 text=error_msg,
326 clear=False,
327 )
329 await session.on_react(recipient, legacy_id, emojis, thread=thread)
330 if isinstance(recipient, LegacyMUC):
331 await recipient.echo(msg, None)
332 else:
333 self.__ack(msg)
335 with self.xmpp.store.session() as orm:
336 multi = self.xmpp.store.id_map.get_xmpp(
337 orm, recipient.stored.id, legacy_id, recipient.is_group
338 )
339 if not multi:
340 return
341 multi = [m for m in multi if react_to != m]
343 if isinstance(recipient, LegacyMUC):
344 for xmpp_id in multi:
345 mc = copy(msg)
346 mc["reactions"]["id"] = xmpp_id
347 await recipient.echo(mc)
348 elif isinstance(recipient, LegacyContact):
349 for xmpp_id in multi:
350 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
352 def __ack(self, msg: Message) -> None:
353 if not self.xmpp.PROPER_RECEIPTS:
354 self.xmpp.delivery_receipt.ack(msg)
356 async def __get_reply(
357 self, msg: Message, session: BaseSession, recipient: Recipient
358 ) -> tuple[
359 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None
360 ]:
361 try:
362 reply_to_msg_id = self._xmpp_msg_id_to_legacy(
363 session, msg["reply"]["id"], recipient
364 )
365 except XMPPError:
366 session.log.debug(
367 "Could not determine reply-to legacy msg ID, sending quote instead."
368 )
369 return redact_url(msg["body"]), None, None, None
371 reply_to_jid = JID(msg["reply"]["to"])
372 reply_to = None
373 if msg["type"] == "chat":
374 if reply_to_jid.bare != session.user_jid.bare:
375 try:
376 reply_to = await session.contacts.by_jid(reply_to_jid)
377 except XMPPError:
378 pass
379 elif msg["type"] == "groupchat":
380 nick = reply_to_jid.resource
381 try:
382 muc = await session.bookmarks.by_jid(reply_to_jid)
383 except XMPPError:
384 pass
385 else:
386 if nick == muc.user_nick:
387 reply_to = await muc.get_user_participant()
388 elif not nick:
389 reply_to = muc.get_system_participant()
390 else:
391 reply_to = await muc.get_participant(nick, store=False)
393 if msg.get_plugin("fallback", check=True) and (
394 isinstance(recipient, LegacyMUC) or recipient.REPLIES
395 ):
396 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
397 try:
398 reply_fallback = redact_url(msg["reply"].get_fallback_body())
399 except AttributeError:
400 reply_fallback = None
401 else:
402 text = msg["body"]
403 reply_fallback = None
405 return text, reply_to_msg_id, reply_to, reply_fallback
407 async def __send_url(
408 self, url: str, session: BaseSession, recipient: Recipient, **kwargs
409 ) -> int | str | None:
410 async with self.xmpp.http.get(url) as response:
411 if response.status >= 400:
412 session.log.warning(
413 "OOB url cannot be downloaded: %s, sending the URL as text"
414 " instead.",
415 response,
416 )
417 return await session.on_text(recipient, url, **kwargs)
419 return await session.on_file(
420 recipient, url, http_response=response, **kwargs
421 )
423 async def __send_bob(
424 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs
425 ) -> int | str | None:
426 with self.xmpp.store.session() as orm:
427 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
428 if sticker is None:
429 await self.xmpp.plugin["xep_0231"].get_bob(
430 from_, cid, ifrom=self.xmpp.boundjid
431 )
432 with self.xmpp.store.session() as orm:
433 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
434 assert sticker is not None
435 return await session.on_sticker(recipient, sticker, **kwargs)
438def redact_url(text: str) -> str:
439 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX
440 if not needle:
441 return text
442 return text.replace(needle, "")
445def parse_link_previews(link_metadatas: list[LinkMetadata]) -> list[LinkPreview]:
446 result = []
447 for link_metadata in link_metadatas:
448 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type]
449 if (
450 preview.image
451 and isinstance(preview.image, str)
452 and preview.image.startswith("data:image/jpeg;base64,")
453 ):
454 try:
455 image = base64.b64decode(
456 preview.image.removeprefix("data:image/jpeg;base64,")
457 )
458 except Exception as e:
459 log.warning(
460 "Could not decode base64-encoded image: %s '%s'", e, preview.image
461 )
462 else:
463 preview = preview._replace(image=image)
464 result.append(preview)
465 return result
468log = logging.getLogger(__name__)