Coverage for slidge/core/dispatcher/message/chat_state.py: 75%

32 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1from slixmpp import Message 

2from slixmpp.xmlstream import StanzaBase 

3 

4from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

5 

6 

7class ChatStateMixin(DispatcherMixin): 

8 def __init__(self, xmpp) -> None: 

9 super().__init__(xmpp) 

10 xmpp.add_event_handler("chatstate_active", self.on_chatstate_active) 

11 xmpp.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) 

12 xmpp.add_event_handler("chatstate_composing", self.on_chatstate_composing) 

13 xmpp.add_event_handler("chatstate_paused", self.on_chatstate_paused) 

14 

15 @exceptions_to_xmpp_errors 

16 async def on_chatstate_active(self, msg: StanzaBase) -> None: 

17 assert isinstance(msg, Message) 

18 if msg["body"]: 

19 # if there is a body, it's handled in on_legacy_message() 

20 return 

21 session, entity, thread = await self._get_session_entity_thread(msg) 

22 await session.on_active(entity, thread) 

23 

24 @exceptions_to_xmpp_errors 

25 async def on_chatstate_inactive(self, msg: StanzaBase) -> None: 

26 assert isinstance(msg, Message) 

27 session, entity, thread = await self._get_session_entity_thread(msg) 

28 await session.on_inactive(entity, thread) 

29 

30 @exceptions_to_xmpp_errors 

31 async def on_chatstate_composing(self, msg: StanzaBase) -> None: 

32 assert isinstance(msg, Message) 

33 session, entity, thread = await self._get_session_entity_thread(msg) 

34 await session.on_composing(entity, thread) 

35 

36 @exceptions_to_xmpp_errors 

37 async def on_chatstate_paused(self, msg: StanzaBase) -> None: 

38 assert isinstance(msg, Message) 

39 session, entity, thread = await self._get_session_entity_thread(msg) 

40 await session.on_paused(entity, thread)