Coverage for slidge/core/dispatcher/muc/ping.py: 71%
35 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1from typing import TYPE_CHECKING
3from slixmpp import CoroutineCallback, Iq, StanzaPath
4from slixmpp.exceptions import XMPPError
6from ....group import LegacyMUC
7from ..util import DispatcherMixin, exceptions_to_xmpp_errors
9if TYPE_CHECKING:
10 from slidge.core.gateway import BaseGateway
13class PingMixin(DispatcherMixin):
14 def __init__(self, xmpp: "BaseGateway"):
15 super().__init__(xmpp)
17 xmpp.remove_handler("Ping")
18 xmpp.register_handler(
19 CoroutineCallback(
20 "Ping",
21 StanzaPath("iq@type=get/ping"),
22 self.__handle_ping,
23 )
24 )
25 xmpp.plugin["xep_0030"].add_feature("urn:xmpp:ping")
27 @exceptions_to_xmpp_errors
28 async def __handle_ping(self, iq: Iq) -> None:
29 ito = iq.get_to()
30 if ito == self.xmpp.boundjid.bare:
31 iq.reply().send()
33 session = await self._get_session(iq)
35 try:
36 muc = await session.bookmarks.by_jid(ito)
37 except XMPPError:
38 pass
39 else:
40 self.__handle_muc_ping(muc, iq)
41 return
43 try:
44 await session.contacts.by_jid(ito)
45 except XMPPError:
46 pass
47 else:
48 iq.reply().send()
49 return
51 raise XMPPError(
52 "item-not-found", f"This JID does not match anything slidge knows: {ito}"
53 )
55 @staticmethod
56 def __handle_muc_ping(muc: LegacyMUC, iq: Iq) -> None:
57 if iq.get_from().resource in muc.get_user_resources():
58 iq.reply().send()
59 else:
60 raise XMPPError("not-acceptable", etype="cancel", by=muc.jid)