Coverage for slidge/core/dispatcher/muc/misc.py: 93%
74 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import logging
3from slixmpp import (
4 JID,
5 CoroutineCallback,
6 Iq,
7 MatchXMLMask,
8 Message,
9 Presence,
10 StanzaPath,
11)
12from slixmpp.exceptions import XMPPError
14from ..util import DispatcherMixin, exceptions_to_xmpp_errors
17class MucMiscMixin(DispatcherMixin):
18 __slots__: list[str] = []
20 def __init__(self, xmpp) -> None:
21 super().__init__(xmpp)
22 xmpp.register_handler(
23 CoroutineCallback(
24 "ibr_remove", StanzaPath("/iq/register"), self.on_ibr_remove
25 )
26 )
28 xmpp.add_event_handler("groupchat_join", self.on_groupchat_join)
29 xmpp.add_event_handler(
30 "groupchat_direct_invite", self.on_groupchat_direct_invite
31 )
32 xmpp.add_event_handler("groupchat_subject", self.on_groupchat_subject)
33 xmpp.add_event_handler("groupchat_message_error", self.__on_group_chat_error)
34 xmpp.register_handler(
35 CoroutineCallback(
36 "muc_thread_subject",
37 MatchXMLMask(
38 "<message xmlns='jabber:component:accept' type='groupchat'><subject/><thread/></message>"
39 ),
40 self.on_thread_subject,
41 )
42 )
44 async def __on_group_chat_error(self, msg: Message) -> None:
45 condition = msg["error"].get_condition()
46 if condition not in KICKABLE_ERRORS:
47 return
49 try:
50 muc = await self.get_muc_from_stanza(msg)
51 except XMPPError as e:
52 log.debug("Not removing resource", exc_info=e)
53 return
54 mfrom = msg.get_from()
55 resource = mfrom.resource
56 try:
57 muc.remove_user_resource(resource)
58 except KeyError:
59 # this actually happens quite frequently on for both beagle and monal
60 # (not sure why?), but is of no consequence
61 log.debug("%s was not in the resources of %s", resource, muc)
62 else:
63 log.debug(
64 "Removed %s from the resources of %s because of error", resource, muc
65 )
67 @exceptions_to_xmpp_errors
68 async def on_ibr_remove(self, iq: Iq):
69 if iq.get_to() == self.xmpp.boundjid.bare:
70 return
72 if iq["type"] == "set" and iq["register"]["remove"]:
73 muc = await self.get_muc_from_stanza(iq)
74 await muc.session.on_leave_group(muc.legacy_id)
75 iq.reply().send()
76 await muc.session.bookmarks.remove(
77 muc, "You left this chat from an XMPP client."
78 )
79 return
81 raise XMPPError("feature-not-implemented")
83 @exceptions_to_xmpp_errors
84 async def on_groupchat_join(self, p: Presence):
85 if not self.xmpp.GROUPS:
86 raise XMPPError(
87 "feature-not-implemented",
88 "This gateway does not implement multi-user chats.",
89 )
90 muc = await self.get_muc_from_stanza(p)
91 await muc.join(p)
93 @exceptions_to_xmpp_errors
94 async def on_groupchat_direct_invite(self, msg: Message):
95 invite = msg["groupchat_invite"]
96 jid = JID(invite["jid"])
98 if jid.domain != self.xmpp.boundjid.bare:
99 raise XMPPError(
100 "bad-request",
101 "Legacy contacts can only be invited to legacy groups, not standard XMPP MUCs.",
102 )
104 if invite["password"]:
105 raise XMPPError(
106 "bad-request", "Password-protected groups are not supported"
107 )
109 session = await self._get_session(msg, logged=True)
110 contact = await session.contacts.by_jid(msg.get_to())
111 muc = await session.bookmarks.by_jid(jid)
113 await session.on_invitation(contact, muc, invite["reason"] or None)
115 @exceptions_to_xmpp_errors
116 async def on_groupchat_subject(self, msg: Message):
117 muc = await self.get_muc_from_stanza(msg)
118 if not muc.HAS_SUBJECT:
119 raise XMPPError(
120 "bad-request",
121 "There are no room subject in here. "
122 "Use the room configuration to update its name or description",
123 )
124 await muc.on_set_subject(msg["subject"])
126 @exceptions_to_xmpp_errors
127 async def on_thread_subject(self, msg: Message):
128 if msg["body"]:
129 return
130 session, muc, thread = await self._get_session_recipient_thread(msg)
131 assert thread is not None
132 await muc.on_set_thread_subject(thread, msg["subject"]) # type:ignore[union-attr]
135KICKABLE_ERRORS = {
136 "gone",
137 "internal-server-error",
138 "item-not-found",
139 "jid-malformed",
140 "recipient-unavailable",
141 "redirect",
142 "remote-server-not-found",
143 "remote-server-timeout",
144 "service-unavailable",
145 "malformed error",
146}
148log = logging.getLogger(__name__)