Coverage for slidge/core/dispatcher/message/chat_state.py: 72%

39 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1from slixmpp import Message 

2from slixmpp.xmlstream import StanzaBase 

3 

4from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

5 

6 

7class ChatStateMixin(DispatcherMixin): 

8 __slots__: list[str] = [] 

9 

10 def __init__(self, xmpp) -> None: 

11 super().__init__(xmpp) 

12 xmpp.add_event_handler("chatstate_active", self.on_chatstate_active) 

13 xmpp.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) 

14 xmpp.add_event_handler("chatstate_composing", self.on_chatstate_composing) 

15 xmpp.add_event_handler("chatstate_paused", self.on_chatstate_paused) 

16 xmpp.add_event_handler("chatstate_gone", self.on_chatstate_gone) 

17 

18 @exceptions_to_xmpp_errors 

19 async def on_chatstate_active(self, msg: StanzaBase) -> None: 

20 assert isinstance(msg, Message) 

21 if msg["body"]: 

22 # if there is a body, it's handled in on_legacy_message() 

23 return 

24 session, recipient, thread = await self._get_session_recipient_thread(msg) 

25 await session.on_active(recipient, thread) 

26 

27 @exceptions_to_xmpp_errors 

28 async def on_chatstate_inactive(self, msg: StanzaBase) -> None: 

29 assert isinstance(msg, Message) 

30 session, recipient, thread = await self._get_session_recipient_thread(msg) 

31 await session.on_inactive(recipient, thread) 

32 

33 @exceptions_to_xmpp_errors 

34 async def on_chatstate_composing(self, msg: StanzaBase) -> None: 

35 assert isinstance(msg, Message) 

36 session, recipient, thread = await self._get_session_recipient_thread(msg) 

37 await session.on_composing(recipient, thread) 

38 

39 @exceptions_to_xmpp_errors 

40 async def on_chatstate_paused(self, msg: StanzaBase) -> None: 

41 assert isinstance(msg, Message) 

42 session, recipient, thread = await self._get_session_recipient_thread(msg) 

43 await session.on_paused(recipient, thread) 

44 

45 @exceptions_to_xmpp_errors 

46 async def on_chatstate_gone(self, msg: StanzaBase) -> None: 

47 assert isinstance(msg, Message) 

48 session, recipient, thread = await self._get_session_recipient_thread(msg) 

49 await session.on_gone(recipient, thread)