Coverage for slidge / core / dispatcher / message / chat_state.py: 72%
40 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1from typing import TYPE_CHECKING
3from slixmpp import Message
4from slixmpp.xmlstream import StanzaBase
6from ..util import DispatcherMixin, exceptions_to_xmpp_errors
8if TYPE_CHECKING:
9 from slidge.core.gateway import BaseGateway
12class ChatStateMixin(DispatcherMixin):
13 __slots__: list[str] = []
15 def __init__(self, xmpp: "BaseGateway") -> None:
16 super().__init__(xmpp)
17 xmpp.add_event_handler("chatstate_active", self.on_chatstate_active)
18 xmpp.add_event_handler("chatstate_inactive", self.on_chatstate_inactive)
19 xmpp.add_event_handler("chatstate_composing", self.on_chatstate_composing)
20 xmpp.add_event_handler("chatstate_paused", self.on_chatstate_paused)
21 xmpp.add_event_handler("chatstate_gone", self.on_chatstate_gone)
23 @exceptions_to_xmpp_errors
24 async def on_chatstate_active(self, msg: StanzaBase) -> None:
25 assert isinstance(msg, Message)
26 if msg["body"]:
27 # if there is a body, it's handled in on_legacy_message()
28 return
29 session, recipient, thread = await self._get_session_recipient_thread(msg)
30 await session.on_active(recipient, thread)
32 @exceptions_to_xmpp_errors
33 async def on_chatstate_inactive(self, msg: StanzaBase) -> None:
34 assert isinstance(msg, Message)
35 session, recipient, thread = await self._get_session_recipient_thread(msg)
36 await session.on_inactive(recipient, thread)
38 @exceptions_to_xmpp_errors
39 async def on_chatstate_composing(self, msg: StanzaBase) -> None:
40 assert isinstance(msg, Message)
41 session, recipient, thread = await self._get_session_recipient_thread(msg)
42 await session.on_composing(recipient, thread)
44 @exceptions_to_xmpp_errors
45 async def on_chatstate_paused(self, msg: StanzaBase) -> None:
46 assert isinstance(msg, Message)
47 session, recipient, thread = await self._get_session_recipient_thread(msg)
48 await session.on_paused(recipient, thread)
50 @exceptions_to_xmpp_errors
51 async def on_chatstate_gone(self, msg: StanzaBase) -> None:
52 assert isinstance(msg, Message)
53 session, recipient, thread = await self._get_session_recipient_thread(msg)
54 await session.on_gone(recipient, thread)