Coverage for slidge / core / dispatcher / message / marker.py: 91%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1from typing import TYPE_CHECKING
3from slixmpp import JID, Message
4from slixmpp.xmlstream import StanzaBase
6from ....group.room import LegacyMUC
7from ....util.types import AnyRecipient
8from ..util import DispatcherMixin, exceptions_to_xmpp_errors, get_recipient
10if TYPE_CHECKING:
11 from slidge.util.types import AnyGateway, AnyRecipient
14class MarkerMixin(DispatcherMixin):
15 __slots__: list[str] = []
17 def __init__(self, xmpp: "AnyGateway") -> None:
18 super().__init__(xmpp)
19 xmpp.add_event_handler("marker_displayed", self.on_marker_displayed)
20 xmpp.add_event_handler(
21 "message_displayed_synchronization_publish",
22 self.on_message_displayed_synchronization_publish,
23 )
25 @exceptions_to_xmpp_errors
26 async def on_marker_displayed(self, msg: StanzaBase) -> None:
27 assert isinstance(msg, Message)
28 session = await self._get_session(msg)
30 recipient = await get_recipient(session, msg)
31 legacy_thread = await self._xmpp_to_legacy_thread(session, msg, recipient)
32 to_mark = self.__to_mark(recipient, msg["displayed"]["id"])
33 for xmpp_id in to_mark:
34 await recipient.on_displayed(
35 self._xmpp_msg_id_to_legacy(xmpp_id, recipient), legacy_thread
36 )
37 if isinstance(recipient, LegacyMUC):
38 await recipient.echo(msg, None)
40 def __to_mark(self, recipient: AnyRecipient, msg_id: str) -> list[str]:
41 if self.xmpp.MARK_ALL_MESSAGES:
42 return recipient.pop_unread_xmpp_ids_up_to(msg_id)
43 else:
44 return [msg_id]
46 @exceptions_to_xmpp_errors
47 async def on_message_displayed_synchronization_publish(
48 self, msg: StanzaBase
49 ) -> None:
50 assert isinstance(msg, Message)
51 chat_jid = JID(msg["pubsub_event"]["items"]["item"]["id"])
52 if chat_jid.server != self.xmpp.boundjid.bare:
53 return
55 session = await self._get_session(msg, timeout=None)
57 if chat_jid == self.xmpp.boundjid.bare:
58 return
60 recipient = await session.get_contact_or_group_or_participant(chat_jid)
61 if not isinstance(recipient, LegacyMUC):
62 session.log.debug("Ignoring non-groupchat MDS event")
63 return
65 stanza_id = msg["pubsub_event"]["items"]["item"]["displayed"]["stanza_id"]["id"]
66 to_mark = self.__to_mark(recipient, stanza_id)
67 for xmpp_id in to_mark:
68 await recipient.on_displayed(
69 self._xmpp_msg_id_to_legacy(xmpp_id, recipient), None
70 )