Coverage for slidge / core / dispatcher / message / marker.py: 91%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1from typing import TYPE_CHECKING, Any
3from slixmpp import JID, Message
4from slixmpp.xmlstream import StanzaBase
6from ....contact import LegacyContact
7from ....group.room import LegacyMUC
8from ....util.types import AnyRecipient
9from ..util import DispatcherMixin, exceptions_to_xmpp_errors, get_recipient
11if TYPE_CHECKING:
12 from slidge.core.gateway import BaseGateway
15class MarkerMixin(DispatcherMixin):
16 __slots__: list[str] = []
18 def __init__(self, xmpp: "BaseGateway") -> None:
19 super().__init__(xmpp)
20 xmpp.add_event_handler("marker_displayed", self.on_marker_displayed)
21 xmpp.add_event_handler(
22 "message_displayed_synchronization_publish",
23 self.on_message_displayed_synchronization_publish,
24 )
26 @exceptions_to_xmpp_errors
27 async def on_marker_displayed(self, msg: StanzaBase) -> None:
28 assert isinstance(msg, Message)
29 session = await self._get_session(msg)
31 e: AnyRecipient = await get_recipient(session, msg)
32 legacy_thread = await self._xmpp_to_legacy_thread(session, msg, e)
33 to_mark = self.__to_mark(e, msg["displayed"]["id"])
34 for xmpp_id in to_mark:
35 await session.on_displayed(
36 e, self._xmpp_msg_id_to_legacy(session, xmpp_id, e), legacy_thread
37 )
38 if isinstance(e, LegacyMUC):
39 await e.echo(msg, None)
41 def __to_mark(
42 self, chat: LegacyContact[Any] | LegacyMUC[Any, Any, Any, Any], msg_id: str
43 ) -> list[str]:
44 if self.xmpp.MARK_ALL_MESSAGES:
45 return chat.pop_unread_xmpp_ids_up_to(msg_id)
46 else:
47 return [msg_id]
49 @exceptions_to_xmpp_errors
50 async def on_message_displayed_synchronization_publish(
51 self, msg: StanzaBase
52 ) -> None:
53 assert isinstance(msg, Message)
54 chat_jid = JID(msg["pubsub_event"]["items"]["item"]["id"])
55 if chat_jid.server != self.xmpp.boundjid.bare:
56 return
58 session = await self._get_session(msg, timeout=None)
60 if chat_jid == self.xmpp.boundjid.bare:
61 return
63 chat = await session.get_contact_or_group_or_participant(chat_jid)
64 if not isinstance(chat, LegacyMUC):
65 session.log.debug("Ignoring non-groupchat MDS event")
66 return
68 stanza_id = msg["pubsub_event"]["items"]["item"]["displayed"]["stanza_id"]["id"]
69 to_mark = self.__to_mark(chat, stanza_id)
70 for xmpp_id in to_mark:
71 await session.on_displayed(
72 chat, self._xmpp_msg_id_to_legacy(session, xmpp_id, chat)
73 )