Coverage for slidge / core / dispatcher / message / marker.py: 91%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from slixmpp import JID, Message 

4from slixmpp.xmlstream import StanzaBase 

5 

6from ....contact import LegacyContact 

7from ....group.room import LegacyMUC 

8from ....util.types import AnyRecipient 

9from ..util import DispatcherMixin, exceptions_to_xmpp_errors, get_recipient 

10 

11if TYPE_CHECKING: 

12 from slidge.core.gateway import BaseGateway 

13 

14 

15class MarkerMixin(DispatcherMixin): 

16 __slots__: list[str] = [] 

17 

18 def __init__(self, xmpp: "BaseGateway") -> None: 

19 super().__init__(xmpp) 

20 xmpp.add_event_handler("marker_displayed", self.on_marker_displayed) 

21 xmpp.add_event_handler( 

22 "message_displayed_synchronization_publish", 

23 self.on_message_displayed_synchronization_publish, 

24 ) 

25 

26 @exceptions_to_xmpp_errors 

27 async def on_marker_displayed(self, msg: StanzaBase) -> None: 

28 assert isinstance(msg, Message) 

29 session = await self._get_session(msg) 

30 

31 e: AnyRecipient = await get_recipient(session, msg) 

32 legacy_thread = await self._xmpp_to_legacy_thread(session, msg, e) 

33 to_mark = self.__to_mark(e, msg["displayed"]["id"]) 

34 for xmpp_id in to_mark: 

35 await session.on_displayed( 

36 e, self._xmpp_msg_id_to_legacy(session, xmpp_id, e), legacy_thread 

37 ) 

38 if isinstance(e, LegacyMUC): 

39 await e.echo(msg, None) 

40 

41 def __to_mark( 

42 self, chat: LegacyContact[Any] | LegacyMUC[Any, Any, Any, Any], msg_id: str 

43 ) -> list[str]: 

44 if self.xmpp.MARK_ALL_MESSAGES: 

45 return chat.pop_unread_xmpp_ids_up_to(msg_id) 

46 else: 

47 return [msg_id] 

48 

49 @exceptions_to_xmpp_errors 

50 async def on_message_displayed_synchronization_publish( 

51 self, msg: StanzaBase 

52 ) -> None: 

53 assert isinstance(msg, Message) 

54 chat_jid = JID(msg["pubsub_event"]["items"]["item"]["id"]) 

55 if chat_jid.server != self.xmpp.boundjid.bare: 

56 return 

57 

58 session = await self._get_session(msg, timeout=None) 

59 

60 if chat_jid == self.xmpp.boundjid.bare: 

61 return 

62 

63 chat = await session.get_contact_or_group_or_participant(chat_jid) 

64 if not isinstance(chat, LegacyMUC): 

65 session.log.debug("Ignoring non-groupchat MDS event") 

66 return 

67 

68 stanza_id = msg["pubsub_event"]["items"]["item"]["displayed"]["stanza_id"]["id"] 

69 to_mark = self.__to_mark(chat, stanza_id) 

70 for xmpp_id in to_mark: 

71 await session.on_displayed( 

72 chat, self._xmpp_msg_id_to_legacy(session, xmpp_id, chat) 

73 )