Coverage for slidge/core/dispatcher/message/message.py: 73%
232 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import logging
2from copy import copy
3from xml.etree import ElementTree
5from slixmpp import JID, Message
6from slixmpp.exceptions import XMPPError
8from ....contact.contact import LegacyContact
9from ....group.participant import LegacyParticipant
10from ....group.room import LegacyMUC
11from ....util.types import LinkPreview, Recipient
12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16
13from ... import config
14from ...session import BaseSession
15from ..util import DispatcherMixin, exceptions_to_xmpp_errors
18class MessageContentMixin(DispatcherMixin):
19 __slots__: list[str] = []
21 def __init__(self, xmpp) -> None:
22 super().__init__(xmpp)
23 xmpp.add_event_handler("legacy_message", self.on_legacy_message)
24 xmpp.add_event_handler("message_correction", self.on_message_correction)
25 xmpp.add_event_handler("message_retract", self.on_message_retract)
26 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message)
27 xmpp.add_event_handler("reactions", self.on_reactions)
29 async def on_groupchat_message(self, msg: Message) -> None:
30 await self.on_legacy_message(msg)
32 @exceptions_to_xmpp_errors
33 async def on_legacy_message(self, msg: Message) -> None:
34 """
35 Meant to be called from :class:`BaseGateway` only.
37 :param msg:
38 :return:
39 """
40 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not
41 # present. this is a problem for MUC echoed messages
42 if msg.get_plugin("replace", check=True) is not None:
43 # ignore last message correction (handled by a specific method)
44 return
45 if msg.get_plugin("apply_to", check=True) is not None:
46 # ignore message retraction (handled by a specific method)
47 return
48 if msg.get_plugin("reactions", check=True) is not None:
49 # ignore message reaction fallback.
50 # the reaction itself is handled by self.react_from_msg().
51 return
52 if msg.get_plugin("retract", check=True) is not None:
53 # ignore message retraction fallback.
54 # the retraction itself is handled by self.on_retract
55 return
56 cid = None
57 if msg.get_plugin("html", check=True) is not None:
58 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>")
59 p = body.findall("p")
60 if p is not None and len(p) == 1:
61 if p[0].text is None or not p[0].text.strip():
62 images = p[0].findall("img")
63 if len(images) == 1:
64 # no text, single img ⇒ this is a sticker
65 # other cases should be interpreted as "custom emojis" in text
66 src = images[0].get("src")
67 if src is not None and src.startswith("cid:"):
68 cid = src.removeprefix("cid:")
70 session, recipient, thread = await self._get_session_recipient_thread(msg)
72 if msg.get_plugin("oob", check=True) is not None:
73 url = msg["oob"]["url"]
74 elif (
75 "reference" in msg
76 and "sims" in msg["reference"]
77 and "sources" in msg["reference"]["sims"]
78 ):
79 for source in msg["reference"]["sims"]["sources"]["substanzas"]:
80 if source["uri"].startswith("http"):
81 url = source["uri"]
82 break
83 else:
84 url = None
85 else:
86 url = None
88 if msg.get_plugin("reply", check=True):
89 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply(
90 msg, session, recipient
91 )
92 else:
93 text = msg["body"]
94 reply_to_msg_id = None
95 reply_to = None
96 reply_fallback = None
98 if msg.get_plugin("link_previews", check=True):
99 link_previews = [
100 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"]
101 ]
102 else:
103 link_previews = []
105 if url:
106 legacy_msg_id = await self.__send_url(
107 url,
108 session,
109 recipient,
110 reply_to_msg_id=reply_to_msg_id,
111 reply_to_fallback_text=reply_fallback,
112 reply_to=reply_to,
113 thread=thread,
114 )
115 elif cid:
116 legacy_msg_id = await self.__send_bob(
117 msg.get_from(),
118 cid,
119 session,
120 recipient,
121 reply_to_msg_id=reply_to_msg_id,
122 reply_to_fallback_text=reply_fallback,
123 reply_to=reply_to,
124 thread=thread,
125 )
126 elif text:
127 if isinstance(recipient, LegacyMUC):
128 mentions = {"mentions": await recipient.parse_mentions(text)}
129 else:
130 mentions = {}
131 legacy_msg_id = await session.on_text(
132 recipient,
133 text,
134 reply_to_msg_id=reply_to_msg_id,
135 reply_to_fallback_text=reply_fallback,
136 reply_to=reply_to,
137 thread=thread,
138 link_previews=link_previews,
139 **mentions,
140 )
141 else:
142 log.debug("Ignoring %s", msg.get_id())
143 return
145 if isinstance(recipient, LegacyMUC):
146 await recipient.echo(msg, legacy_msg_id)
147 else:
148 self.__ack(msg)
150 if legacy_msg_id is None:
151 return
153 with self.xmpp.store.session() as orm:
154 self.xmpp.store.id_map.set_msg(
155 orm,
156 recipient.stored.id,
157 str(legacy_msg_id),
158 [msg.get_id()],
159 recipient.is_group,
160 )
161 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]):
162 self.xmpp.store.id_map.set_thread(
163 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group
164 )
165 orm.commit()
167 @exceptions_to_xmpp_errors
168 async def on_message_correction(self, msg: Message) -> None:
169 if msg.get_plugin("retract", check=True) is not None:
170 # ignore message retraction fallback (fallback=last msg correction)
171 return
172 session, recipient, thread = await self._get_session_recipient_thread(msg)
173 xmpp_id = msg["replace"]["id"]
174 if isinstance(recipient, LegacyMUC):
175 with self.xmpp.store.session() as orm:
176 legacy_id_str = self.xmpp.store.id_map.get_legacy(
177 orm, recipient.stored.id, xmpp_id, True
178 )
179 if legacy_id_str is None:
180 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id, recipient)
181 else:
182 legacy_id = self.xmpp.LEGACY_MSG_ID_TYPE(legacy_id_str)
183 else:
184 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id, recipient)
186 if isinstance(recipient, LegacyMUC):
187 mentions = await recipient.parse_mentions(msg["body"])
188 else:
189 mentions = None
191 if previews := msg["link_previews"]:
192 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews]
193 else:
194 link_previews = []
196 if legacy_id is None:
197 log.debug("Did not find legacy ID to correct")
198 new_legacy_msg_id = await session.on_text(
199 recipient,
200 "Correction:" + msg["body"],
201 thread=thread,
202 mentions=mentions,
203 link_previews=link_previews,
204 )
205 elif not msg["body"].strip() and recipient.RETRACTION:
206 await session.on_retract(recipient, legacy_id, thread=thread)
207 new_legacy_msg_id = None
208 elif recipient.CORRECTION:
209 new_legacy_msg_id = await session.on_correct(
210 recipient,
211 msg["body"],
212 legacy_id,
213 thread=thread,
214 mentions=mentions,
215 link_previews=link_previews,
216 )
217 else:
218 session.send_gateway_message(
219 "Last message correction is not supported by this legacy service. "
220 "Slidge will send your correction as new message."
221 )
222 if recipient.RETRACTION and legacy_id is not None:
223 if legacy_id is not None:
224 session.send_gateway_message(
225 "Slidge will attempt to retract the original message you wanted"
226 " to edit."
227 )
228 await session.on_retract(recipient, legacy_id, thread=thread)
230 new_legacy_msg_id = await session.on_text(
231 recipient,
232 "Correction: " + msg["body"],
233 thread=thread,
234 mentions=mentions,
235 link_previews=link_previews,
236 )
238 if isinstance(recipient, LegacyMUC):
239 await recipient.echo(msg, new_legacy_msg_id)
240 else:
241 self.__ack(msg)
242 if new_legacy_msg_id is None:
243 return
244 with self.xmpp.store.session() as orm:
245 self.xmpp.store.id_map.set_msg(
246 orm,
247 recipient.stored.id,
248 new_legacy_msg_id,
249 msg.get_id(),
250 recipient.is_group,
251 )
252 orm.commit()
254 @exceptions_to_xmpp_errors
255 async def on_message_retract(self, msg: Message):
256 session, recipient, thread = await self._get_session_recipient_thread(msg)
257 if not recipient.RETRACTION:
258 raise XMPPError(
259 "bad-request",
260 "This legacy service does not support message retraction.",
261 )
262 xmpp_id: str = msg["retract"]["id"]
263 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id, recipient)
264 if legacy_id:
265 await session.on_retract(recipient, legacy_id, thread=thread)
266 if isinstance(recipient, LegacyMUC):
267 await recipient.echo(msg, None)
268 else:
269 log.debug("Ignored retraction from user")
270 self.__ack(msg)
272 @exceptions_to_xmpp_errors
273 async def on_reactions(self, msg: Message):
274 session, recipient, thread = await self._get_session_recipient_thread(msg)
275 react_to: str = msg["reactions"]["id"]
277 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith(
278 session.SPECIAL_MSG_ID_PREFIX
279 )
281 if special_msg:
282 legacy_id = react_to
283 else:
284 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient)
286 if not legacy_id:
287 log.debug("Ignored reaction from user")
288 raise XMPPError(
289 "internal-server-error",
290 "Could not convert the XMPP msg ID to a legacy ID",
291 )
293 emojis = [
294 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"]
295 ]
296 error_msg = None
297 recipient = recipient
299 if not special_msg:
300 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1:
301 error_msg = "Maximum 1 emoji/message"
303 if not error_msg and (
304 subset := await recipient.available_emojis(legacy_id)
305 ):
306 if not set(emojis).issubset(subset):
307 error_msg = f"You can only react with the following emojis: {''.join(subset)}"
309 if error_msg:
310 session.send_gateway_message(error_msg)
311 if not isinstance(recipient, LegacyMUC):
312 # no need to carbon for groups, we just don't echo the stanza
313 recipient.react(legacy_id, carbon=True)
314 await session.on_react(recipient, legacy_id, [], thread=thread)
315 raise XMPPError(
316 "policy-violation",
317 text=error_msg,
318 )
320 await session.on_react(recipient, legacy_id, emojis, thread=thread)
321 if isinstance(recipient, LegacyMUC):
322 await recipient.echo(msg, None)
323 else:
324 self.__ack(msg)
326 with self.xmpp.store.session() as orm:
327 multi = self.xmpp.store.id_map.get_xmpp(
328 orm, recipient.stored.id, legacy_id, recipient.is_group
329 )
330 if not multi:
331 return
332 multi = [m for m in multi if react_to != m]
334 if isinstance(recipient, LegacyMUC):
335 for xmpp_id in multi:
336 mc = copy(msg)
337 mc["reactions"]["id"] = xmpp_id
338 await recipient.echo(mc)
339 elif isinstance(recipient, LegacyContact):
340 for xmpp_id in multi:
341 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True)
343 def __ack(self, msg: Message) -> None:
344 if not self.xmpp.PROPER_RECEIPTS:
345 self.xmpp.delivery_receipt.ack(msg)
347 async def __get_reply(
348 self, msg: Message, session: BaseSession, recipient: Recipient
349 ) -> tuple[
350 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None
351 ]:
352 try:
353 reply_to_msg_id = self._xmpp_msg_id_to_legacy(
354 session, msg["reply"]["id"], recipient
355 )
356 except XMPPError:
357 session.log.debug(
358 "Could not determine reply-to legacy msg ID, sending quote instead."
359 )
360 return msg["body"], None, None, None
362 reply_to_jid = JID(msg["reply"]["to"])
363 reply_to = None
364 if msg["type"] == "chat":
365 if reply_to_jid.bare != session.user_jid.bare:
366 try:
367 reply_to = await session.contacts.by_jid(reply_to_jid)
368 except XMPPError:
369 pass
370 elif msg["type"] == "groupchat":
371 nick = reply_to_jid.resource
372 try:
373 muc = await session.bookmarks.by_jid(reply_to_jid)
374 except XMPPError:
375 pass
376 else:
377 if nick != muc.user_nick:
378 reply_to = await muc.get_participant(
379 reply_to_jid.resource, store=False
380 )
382 if msg.get_plugin("fallback", check=True) and (
383 isinstance(recipient, LegacyMUC) or recipient.REPLIES
384 ):
385 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace)
386 try:
387 reply_fallback = msg["reply"].get_fallback_body()
388 except AttributeError:
389 reply_fallback = None
390 else:
391 text = msg["body"]
392 reply_fallback = None
394 return text, reply_to_msg_id, reply_to, reply_fallback
396 async def __send_url(
397 self, url: str, session: BaseSession, recipient: Recipient, **kwargs
398 ) -> int | str | None:
399 async with self.xmpp.http.get(url) as response:
400 if response.status >= 400:
401 session.log.warning(
402 "OOB url cannot be downloaded: %s, sending the URL as text"
403 " instead.",
404 response,
405 )
406 return await session.on_text(recipient, url, **kwargs)
408 return await session.on_file(
409 recipient, url, http_response=response, **kwargs
410 )
412 async def __send_bob(
413 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs
414 ) -> int | str | None:
415 with self.xmpp.store.session() as orm:
416 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
417 if sticker is None:
418 await self.xmpp.plugin["xep_0231"].get_bob(from_, cid)
419 with self.xmpp.store.session() as orm:
420 sticker = self.xmpp.store.bob.get_sticker(orm, cid)
421 assert sticker is not None
422 return await session.on_sticker(recipient, sticker, **kwargs)
425log = logging.getLogger(__name__)