Coverage for slidge / core / dispatcher / message / marker.py: 91%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1from typing import TYPE_CHECKING 

2 

3from slixmpp import JID, Message 

4from slixmpp.xmlstream import StanzaBase 

5 

6from ....group.room import LegacyMUC 

7from ....util.types import AnyRecipient 

8from ..util import DispatcherMixin, exceptions_to_xmpp_errors, get_recipient 

9 

10if TYPE_CHECKING: 

11 from slidge.util.types import AnyGateway, AnyRecipient 

12 

13 

14class MarkerMixin(DispatcherMixin): 

15 __slots__: list[str] = [] 

16 

17 def __init__(self, xmpp: "AnyGateway") -> None: 

18 super().__init__(xmpp) 

19 xmpp.add_event_handler("marker_displayed", self.on_marker_displayed) 

20 xmpp.add_event_handler( 

21 "message_displayed_synchronization_publish", 

22 self.on_message_displayed_synchronization_publish, 

23 ) 

24 

25 @exceptions_to_xmpp_errors 

26 async def on_marker_displayed(self, msg: StanzaBase) -> None: 

27 assert isinstance(msg, Message) 

28 session = await self._get_session(msg) 

29 

30 recipient = await get_recipient(session, msg) 

31 legacy_thread = await self._xmpp_to_legacy_thread(session, msg, recipient) 

32 to_mark = self.__to_mark(recipient, msg["displayed"]["id"]) 

33 for xmpp_id in to_mark: 

34 await recipient.on_displayed( 

35 self._xmpp_msg_id_to_legacy(xmpp_id, recipient), legacy_thread 

36 ) 

37 if isinstance(recipient, LegacyMUC): 

38 await recipient.echo(msg, None) 

39 

40 def __to_mark(self, recipient: AnyRecipient, msg_id: str) -> list[str]: 

41 if self.xmpp.MARK_ALL_MESSAGES: 

42 return recipient.pop_unread_xmpp_ids_up_to(msg_id) 

43 else: 

44 return [msg_id] 

45 

46 @exceptions_to_xmpp_errors 

47 async def on_message_displayed_synchronization_publish( 

48 self, msg: StanzaBase 

49 ) -> None: 

50 assert isinstance(msg, Message) 

51 chat_jid = JID(msg["pubsub_event"]["items"]["item"]["id"]) 

52 if chat_jid.server != self.xmpp.boundjid.bare: 

53 return 

54 

55 session = await self._get_session(msg, timeout=None) 

56 

57 if chat_jid == self.xmpp.boundjid.bare: 

58 return 

59 

60 recipient = await session.get_contact_or_group_or_participant(chat_jid) 

61 if not isinstance(recipient, LegacyMUC): 

62 session.log.debug("Ignoring non-groupchat MDS event") 

63 return 

64 

65 stanza_id = msg["pubsub_event"]["items"]["item"]["displayed"]["stanza_id"]["id"] 

66 to_mark = self.__to_mark(recipient, stanza_id) 

67 for xmpp_id in to_mark: 

68 await recipient.on_displayed( 

69 self._xmpp_msg_id_to_legacy(xmpp_id, recipient), None 

70 )