Coverage for slidge / core / dispatcher / message / chat_state.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1from typing import TYPE_CHECKING 

2 

3from slixmpp import Message 

4from slixmpp.xmlstream import StanzaBase 

5 

6from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

7 

8if TYPE_CHECKING: 

9 from slidge.util.types import AnyGateway 

10 

11 

12class ChatStateMixin(DispatcherMixin): 

13 __slots__: list[str] = [] 

14 

15 def __init__(self, xmpp: "AnyGateway") -> None: 

16 super().__init__(xmpp) 

17 xmpp.add_event_handler("chatstate", self.on_chatstate) 

18 

19 @exceptions_to_xmpp_errors 

20 async def on_chatstate(self, msg: StanzaBase) -> None: 

21 assert isinstance(msg, Message) 

22 if msg["body"]: 

23 # if there is a body, it's handled in on_legacy_message() 

24 return 

25 recipient, thread = await self._get_recipient_and_thread(msg) 

26 await recipient.on_chat_state(msg["chat_state"], thread)