Coverage for slidge/core/dispatcher/message/chat_state.py: 72%
39 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1from slixmpp import Message
2from slixmpp.xmlstream import StanzaBase
4from ..util import DispatcherMixin, exceptions_to_xmpp_errors
7class ChatStateMixin(DispatcherMixin):
8 __slots__: list[str] = []
10 def __init__(self, xmpp) -> None:
11 super().__init__(xmpp)
12 xmpp.add_event_handler("chatstate_active", self.on_chatstate_active)
13 xmpp.add_event_handler("chatstate_inactive", self.on_chatstate_inactive)
14 xmpp.add_event_handler("chatstate_composing", self.on_chatstate_composing)
15 xmpp.add_event_handler("chatstate_paused", self.on_chatstate_paused)
16 xmpp.add_event_handler("chatstate_gone", self.on_chatstate_gone)
18 @exceptions_to_xmpp_errors
19 async def on_chatstate_active(self, msg: StanzaBase) -> None:
20 assert isinstance(msg, Message)
21 if msg["body"]:
22 # if there is a body, it's handled in on_legacy_message()
23 return
24 session, recipient, thread = await self._get_session_recipient_thread(msg)
25 await session.on_active(recipient, thread)
27 @exceptions_to_xmpp_errors
28 async def on_chatstate_inactive(self, msg: StanzaBase) -> None:
29 assert isinstance(msg, Message)
30 session, recipient, thread = await self._get_session_recipient_thread(msg)
31 await session.on_inactive(recipient, thread)
33 @exceptions_to_xmpp_errors
34 async def on_chatstate_composing(self, msg: StanzaBase) -> None:
35 assert isinstance(msg, Message)
36 session, recipient, thread = await self._get_session_recipient_thread(msg)
37 await session.on_composing(recipient, thread)
39 @exceptions_to_xmpp_errors
40 async def on_chatstate_paused(self, msg: StanzaBase) -> None:
41 assert isinstance(msg, Message)
42 session, recipient, thread = await self._get_session_recipient_thread(msg)
43 await session.on_paused(recipient, thread)
45 @exceptions_to_xmpp_errors
46 async def on_chatstate_gone(self, msg: StanzaBase) -> None:
47 assert isinstance(msg, Message)
48 session, recipient, thread = await self._get_session_recipient_thread(msg)
49 await session.on_gone(recipient, thread)