Coverage for slidge / core / dispatcher / message / marker.py: 91%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1from typing import Any 

2 

3from slixmpp import JID, Message 

4from slixmpp.xmlstream import StanzaBase 

5 

6from ....contact import LegacyContact 

7from ....group.room import LegacyMUC 

8from ....util.types import Recipient 

9from ..util import DispatcherMixin, exceptions_to_xmpp_errors, get_recipient 

10 

11 

12class MarkerMixin(DispatcherMixin): 

13 __slots__: list[str] = [] 

14 

15 def __init__(self, xmpp) -> None: 

16 super().__init__(xmpp) 

17 xmpp.add_event_handler("marker_displayed", self.on_marker_displayed) 

18 xmpp.add_event_handler( 

19 "message_displayed_synchronization_publish", 

20 self.on_message_displayed_synchronization_publish, 

21 ) 

22 

23 @exceptions_to_xmpp_errors 

24 async def on_marker_displayed(self, msg: StanzaBase) -> None: 

25 assert isinstance(msg, Message) 

26 session = await self._get_session(msg) 

27 

28 e: Recipient = await get_recipient(session, msg) 

29 legacy_thread = await self._xmpp_to_legacy_thread(session, msg, e) 

30 to_mark = self.__to_mark(e, msg["displayed"]["id"]) 

31 for xmpp_id in to_mark: 

32 await session.on_displayed( 

33 e, self._xmpp_msg_id_to_legacy(session, xmpp_id, e), legacy_thread 

34 ) 

35 if isinstance(e, LegacyMUC): 

36 await e.echo(msg, None) 

37 

38 def __to_mark( 

39 self, chat: LegacyContact[Any] | LegacyMUC[Any, Any, Any, Any], msg_id: str 

40 ) -> list[str]: 

41 if self.xmpp.MARK_ALL_MESSAGES: 

42 return chat.pop_unread_xmpp_ids_up_to(msg_id) 

43 else: 

44 return [msg_id] 

45 

46 @exceptions_to_xmpp_errors 

47 async def on_message_displayed_synchronization_publish( 

48 self, msg: StanzaBase 

49 ) -> None: 

50 assert isinstance(msg, Message) 

51 chat_jid = JID(msg["pubsub_event"]["items"]["item"]["id"]) 

52 if chat_jid.server != self.xmpp.boundjid.bare: 

53 return 

54 

55 session = await self._get_session(msg, timeout=None) 

56 

57 if chat_jid == self.xmpp.boundjid.bare: 

58 return 

59 

60 chat = await session.get_contact_or_group_or_participant(chat_jid) 

61 if not isinstance(chat, LegacyMUC): 

62 session.log.debug("Ignoring non-groupchat MDS event") 

63 return 

64 

65 stanza_id = msg["pubsub_event"]["items"]["item"]["displayed"]["stanza_id"]["id"] 

66 to_mark = self.__to_mark(chat, stanza_id) 

67 for xmpp_id in to_mark: 

68 await session.on_displayed( 

69 chat, self._xmpp_msg_id_to_legacy(session, xmpp_id, chat) 

70 )