Coverage for slidge/core/dispatcher/message/message.py: 81%
233 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import logging
2from copy import copy
3from xml.etree import ElementTree
5from slixmpp import JID, Message
6from slixmpp.exceptions import XMPPError
8from ....contact.contact import LegacyContact
9from ....group.participant import LegacyParticipant
10from ....group.room import LegacyMUC
11from ....util.types import LinkPreview, Recipient
12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
13from ... import config
14from ...session import BaseSession
15from ..util import DispatcherMixin, exceptions_to_xmpp_errors
18class MessageContentMixin(DispatcherMixin):
19 __slots__: list[str] = []
21 def __init__(self, xmpp) -> None:
22 super().__init__(xmpp)
23 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
24 xmpp.add_event_handler("message_correction", self.on_message_correction)
25 xmpp.add_event_handler("message_retract", self.on_message_retract)
26 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
27 xmpp.add_event_handler("reactions", self.on_reactions)
29 async def on_groupchat_message(self, msg: Message) -> None:
30 await self.on_legacy_message(msg)
32 @exceptions_to_xmpp_errors
33 async def on_legacy_message(self, msg: Message) -> None:
34 """
35 Meant to be called from :class:`BaseGateway` only.
37 :param msg:
38 :return:
39 """
40 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
41 # present. this is a problem for MUC echoed messages
42 if msg.get_plugin("replace", check=True) is not None:
43 # ignore last message correction (handled by a specific method)
44 return
45 if msg.get_plugin("apply_to", check=True) is not None:
46 # ignore message retraction (handled by a specific method)
47 return
48 if msg.get_plugin("reactions", check=True) is not None:
49 # ignore message reaction fallback.
50 # the reaction itself is handled by self.react_from_msg().
51 return
52 if msg.get_plugin("retract", check=True) is not None:
53 # ignore message retraction fallback.
54 # the retraction itself is handled by self.on_retract
55 return
56 cid = None
57 if msg.get_plugin("html", check=True) is not None:
58 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
59 p = body.findall("p")
60 if p is not None and len(p) == 1:
61 if p[0].text is None or not p[0].text.strip():
62 images = p[0].findall("img")
63 if len(images) == 1:
64 # no text, single img ⇒ this is a sticker
65 # other cases should be interpreted as "custom emojis" in text
66 src = images[0].get("src")
67 if src is not None and src.startswith("cid:"):
68 cid = src.removeprefix("cid:")
70 session, recipient, thread = await self._get_session_recipient_thread(msg)
72 if msg.get_plugin("oob", check=True) is not None:
73 url = msg["oob"]["url"]
74 elif (
75 "reference" in msg
76 and "sims" in msg["reference"]
77 and "sources" in msg["reference"]["sims"]
78 ):
79 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
80 if source["uri"].startswith("http"):
81 url = source["uri"]
82 break
83 else:
84 url = None
85 else:
86 url = None
88 if msg.get_plugin("reply", check=True):
89 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply(
90 msg, session, recipient
91 )
92 else:
93 text = msg["body"]
94 reply_to_msg_id = None
95 reply_to = None
96 reply_fallback = None
98 if msg.get_plugin("link_previews", check=True):
99 link_previews = [
100 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"]
101 ]
102 else:
103 link_previews = []
105 if url:
106 legacy_msg_id = await self.__send_url(
107 url,
108 session,
109 recipient,
110 reply_to_msg_id=reply_to_msg_id,
111 reply_to_fallback_text=reply_fallback,
112 reply_to=reply_to,
113 thread=thread,
114 )
115 elif cid:
116 legacy_msg_id = await self.__send_bob(
117 msg.get_from(),
118 cid,
119 session,
120 recipient,
121 reply_to_msg_id=reply_to_msg_id,
122 reply_to_fallback_text=reply_fallback,
123 reply_to=reply_to,
124 thread=thread,
125 )
126 elif text:
127 if isinstance(recipient, LegacyMUC):
128 mentions = {"mentions": await recipient.parse_mentions(text)}
129 else:
130 mentions = {}
131 legacy_msg_id = await session.on_text(
132 recipient,
133 text,
134 reply_to_msg_id=reply_to_msg_id,
135 reply_to_fallback_text=reply_fallback,
136 reply_to=reply_to,
137 thread=thread,
138 link_previews=link_previews,
139 **mentions,
140 )
141 else:
142 log.debug("Ignoring %s", msg.get_id())
143 return
145 if isinstance(recipient, LegacyMUC):
146 stanza_id = await recipient.echo(msg, legacy_msg_id)
147 else:
148 stanza_id = None
149 self.__ack(msg)
151 if legacy_msg_id is None:
152 return
154 with self.xmpp.store.session() as orm:
155 if recipient.is_group:
156 self.xmpp.store.id_map.set_origin(
157 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id()
158 )
159 assert stanza_id is not None
160 self.xmpp.store.id_map.set_msg(
161 orm,
162 recipient.stored.id,
163 str(legacy_msg_id),
164 [stanza_id],
165 True,
166 )
167 else:
168 self.xmpp.store.id_map.set_msg(
169 orm,
170 recipient.stored.id,
171 str(legacy_msg_id),
172 [msg.get_id()],
173 False,
174 )
175 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]):
176 self.xmpp.store.id_map.set_thread(
177 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group
178 )
179 orm.commit()
181 @exceptions_to_xmpp_errors
182 async def on_message_correction(self, msg: Message) -> None:
183 if msg.get_plugin("retract", check=True) is not None:
184 # ignore message retraction fallback (fallback=last msg correction)
185 return
186 session, recipient, thread = await self._get_session_recipient_thread(msg)
187 legacy_id = self._xmpp_msg_id_to_legacy(
188 session, msg["replace"]["id"], recipient, True
189 )
191 if isinstance(recipient, LegacyMUC):
192 mentions = await recipient.parse_mentions(msg["body"])
193 else:
194 mentions = None
196 if previews := msg["link_previews"]:
197 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews]
198 else:
199 link_previews = []
201 if legacy_id is None:
202 log.debug("Did not find legacy ID to correct")
203 new_legacy_msg_id = await session.on_text(
204 recipient,
205 "Correction:" + msg["body"],
206 thread=thread,
207 mentions=mentions,
208 link_previews=link_previews,
209 )
210 elif not msg["body"].strip() and recipient.RETRACTION:
211 await session.on_retract(recipient, legacy_id, thread=thread)
212 new_legacy_msg_id = None
213 elif recipient.CORRECTION:
214 new_legacy_msg_id = await session.on_correct(
215 recipient,
216 msg["body"],
217 legacy_id,
218 thread=thread,
219 mentions=mentions,
220 link_previews=link_previews,
221 )
222 else:
223 session.send_gateway_message(
224 "Last message correction is not supported by this legacy service. "
225 "Slidge will send your correction as new message."
226 )
227 if recipient.RETRACTION and legacy_id is not None:
228 if legacy_id is not None:
229 session.send_gateway_message(
230 "Slidge will attempt to retract the original message you wanted"
231 " to edit."
232 )
233 await session.on_retract(recipient, legacy_id, thread=thread)
235 new_legacy_msg_id = await session.on_text(
236 recipient,
237 "Correction: " + msg["body"],
238 thread=thread,
239 mentions=mentions,
240 link_previews=link_previews,
241 )
243 if isinstance(recipient, LegacyMUC):
244 await recipient.echo(msg, new_legacy_msg_id)
245 else:
246 self.__ack(msg)
247 if new_legacy_msg_id is None:
248 return
249 with self.xmpp.store.session() as orm:
250 self.xmpp.store.id_map.set_msg(
251 orm,
252 recipient.stored.id,
253 new_legacy_msg_id,
254 msg.get_id(),
255 recipient.is_group,
256 )
257 orm.commit()
259 @exceptions_to_xmpp_errors
260 async def on_message_retract(self, msg: Message):
261 session, recipient, thread = await self._get_session_recipient_thread(msg)
262 if not recipient.RETRACTION:
263 raise XMPPError(
264 "bad-request",
265 "This legacy service does not support message retraction.",
266 )
267 xmpp_id: str = msg["retract"]["id"]
268 legacy_id = self._xmpp_msg_id_to_legacy(
269 session, xmpp_id, recipient, origin=True
270 )
271 await session.on_retract(recipient, legacy_id, thread=thread)
272 if isinstance(recipient, LegacyMUC):
273 await recipient.echo(msg, None)
274 self.__ack(msg)
276 @exceptions_to_xmpp_errors
277 async def on_reactions(self, msg: Message):
278 session, recipient, thread = await self._get_session_recipient_thread(msg)
279 react_to: str = msg["reactions"]["id"]
281 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
282 session.SPECIAL_MSG_ID_PREFIX
283 )
285 if special_msg:
286 legacy_id = react_to
287 else:
288 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient)
290 if not legacy_id:
291 log.debug("Ignored reaction from user")
292 raise XMPPError(
293 "internal-server-error",
294 "Could not convert the XMPP msg ID to a legacy ID",
295 )
297 emojis = [
298 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
299 ]
300 error_msg = None
301 recipient = recipient
303 if not special_msg:
304 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
305 error_msg = "Maximum 1 emoji/message"
307 if not error_msg and (
308 subset := await recipient.available_emojis(legacy_id)
309 ):
310 if not set(emojis).issubset(subset):
311 error_msg = f"You can only react with the following emojis: {''.join(subset)}"
313 if error_msg:
314 session.send_gateway_message(error_msg)
315 if not isinstance(recipient, LegacyMUC):
316 # no need to carbon for groups, we just don't echo the stanza
317 recipient.react(legacy_id, carbon=True)
318 await session.on_react(recipient, legacy_id, [], thread=thread)
319 raise XMPPError(
320 "policy-violation",
321 text=error_msg,
322 clear=False,
323 )
325 await session.on_react(recipient, legacy_id, emojis, thread=thread)
326 if isinstance(recipient, LegacyMUC):
327 await recipient.echo(msg, None)
328 else:
329 self.__ack(msg)
331 with self.xmpp.store.session() as orm:
332 multi = self.xmpp.store.id_map.get_xmpp(
333 orm, recipient.stored.id, legacy_id, recipient.is_group
334 )
335 if not multi:
336 return
337 multi = [m for m in multi if react_to != m]
339 if isinstance(recipient, LegacyMUC):
340 for xmpp_id in multi:
341 mc = copy(msg)
342 mc["reactions"]["id"] = xmpp_id
343 await recipient.echo(mc)
344 elif isinstance(recipient, LegacyContact):
345 for xmpp_id in multi:
346 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
348 def __ack(self, msg: Message) -> None:
349 if not self.xmpp.PROPER_RECEIPTS:
350 self.xmpp.delivery_receipt.ack(msg)
352 async def __get_reply(
353 self, msg: Message, session: BaseSession, recipient: Recipient
354 ) -> tuple[
355 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None
356 ]:
357 try:
358 reply_to_msg_id = self._xmpp_msg_id_to_legacy(
359 session, msg["reply"]["id"], recipient
360 )
361 except XMPPError:
362 session.log.debug(
363 "Could not determine reply-to legacy msg ID, sending quote instead."
364 )
365 return redact_url(msg["body"]), None, None, None
367 reply_to_jid = JID(msg["reply"]["to"])
368 reply_to = None
369 if msg["type"] == "chat":
370 if reply_to_jid.bare != session.user_jid.bare:
371 try:
372 reply_to = await session.contacts.by_jid(reply_to_jid)
373 except XMPPError:
374 pass
375 elif msg["type"] == "groupchat":
376 nick = reply_to_jid.resource
377 try:
378 muc = await session.bookmarks.by_jid(reply_to_jid)
379 except XMPPError:
380 pass
381 else:
382 if nick != muc.user_nick:
383 reply_to = await muc.get_participant(
384 reply_to_jid.resource, store=False
385 )
387 if msg.get_plugin("fallback", check=True) and (
388 isinstance(recipient, LegacyMUC) or recipient.REPLIES
389 ):
390 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
391 try:
392 reply_fallback = redact_url(msg["reply"].get_fallback_body())
393 except AttributeError:
394 reply_fallback = None
395 else:
396 text = msg["body"]
397 reply_fallback = None
399 return text, reply_to_msg_id, reply_to, reply_fallback
401 async def __send_url(
402 self, url: str, session: BaseSession, recipient: Recipient, **kwargs
403 ) -> int | str | None:
404 async with self.xmpp.http.get(url) as response:
405 if response.status >= 400:
406 session.log.warning(
407 "OOB url cannot be downloaded: %s, sending the URL as text"
408 " instead.",
409 response,
410 )
411 return await session.on_text(recipient, url, **kwargs)
413 return await session.on_file(
414 recipient, url, http_response=response, **kwargs
415 )
417 async def __send_bob(
418 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs
419 ) -> int | str | None:
420 with self.xmpp.store.session() as orm:
421 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
422 if sticker is None:
423 await self.xmpp.plugin["xep_0231"].get_bob(
424 from_, cid, ifrom=self.xmpp.boundjid
425 )
426 with self.xmpp.store.session() as orm:
427 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
428 assert sticker is not None
429 return await session.on_sticker(recipient, sticker, **kwargs)
432def redact_url(text: str) -> str:
433 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX
434 if not needle:
435 return text
436 return text.replace(needle, "")
439log = logging.getLogger(__name__)