Coverage for slidge / slixfix / __init__.py: 88%
60 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
1# This module contains patches for slixmpp; some have pending requests upstream
2# and should be removed on the next slixmpp release.
3import uuid
5# ruff: noqa: F401
6import slixmpp.plugins
7import slixmpp.stanza.roster
8from slixmpp import Iq, Message
9from slixmpp.exceptions import IqError
10from slixmpp.plugins.xep_0050 import XEP_0050, Command
11from slixmpp.plugins.xep_0356.permissions import IqPermission
12from slixmpp.plugins.xep_0356.privilege import XEP_0356, PrivilegedIqError
13from slixmpp.plugins.xep_0469.stanza import NS as PINNED_NS
14from slixmpp.plugins.xep_0469.stanza import Pinned
15from slixmpp.xmlstream import StanzaBase
17from ..util.archive_msg import set_client_namespace
18from . import (
19 xep_0077,
20 xep_0100,
21 xep_0153,
22 xep_0292,
23)
26def set_pinned(self, val: bool) -> None:
27 extensions = self.parent()
28 if val:
29 extensions.enable("pinned")
30 else:
31 extensions._del_sub(f"{{{PINNED_NS}}}pinned")
34Pinned.set_pinned = set_pinned
37def session_bind(self, jid) -> None:
38 self.xmpp["xep_0030"].add_feature(Command.namespace)
39 # awful hack to for the disco items: we need to comment this line
40 # related issue: https://todo.sr.ht/~nicoco/slidge/131
41 # self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())
44XEP_0050.session_bind = session_bind # type:ignore
47def reply(self, body=None, clear: bool = True):
48 """
49 Overrides slixmpp's Message.reply(), since it strips to sender's resource
50 for mtype=groupchat, and we do not want that, because when we raise an XMPPError,
51 we actually want to preserve the resource.
52 (this is called in RootStanza.exception() to handle XMPPErrors)
53 """
54 new_message = StanzaBase.reply(self, clear)
55 new_message["thread"] = self["thread"]
56 new_message["parent_thread"] = self["parent_thread"]
58 del new_message["id"]
59 if self.stream is not None and self.stream.use_message_ids:
60 new_message["id"] = self.stream.new_id()
62 if body is not None:
63 new_message["body"] = body
64 return new_message
67async def send_privileged_iq(self, encapsulated_iq: Iq, iq_id: str | None = None) -> Iq:
68 """
69 Send an IQ on behalf of a user
71 Caution: the IQ *must* have the jabber:client namespace
73 Raises :class:`PrivilegedIqError` on failure.
74 """
75 iq_id = iq_id or str(uuid.uuid4())
76 encapsulated_iq["id"] = iq_id
77 if encapsulated_iq.namespace != "jabber:client":
78 set_client_namespace(encapsulated_iq)
79 server = encapsulated_iq.get_from().domain
80 perms = self.granted_privileges.get(server)
81 if not perms:
82 raise PermissionError(f"{server} has not granted us any privilege")
83 itype = encapsulated_iq["type"]
84 for ns in encapsulated_iq.plugins.values():
85 type_ = perms.iq[ns.namespace]
86 if type_ == IqPermission.NONE:
87 raise PermissionError(
88 f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
89 )
90 elif type_ == IqPermission.BOTH:
91 pass
92 elif type_ != itype:
93 raise PermissionError(
94 f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
95 )
96 iq = self.xmpp.make_iq(
97 itype=itype,
98 ifrom=self.xmpp.boundjid.bare,
99 ito=encapsulated_iq.get_from(),
100 id=iq_id,
101 )
102 iq["privileged_iq"].append(encapsulated_iq)
104 try:
105 resp = await iq.send()
106 except IqError as exc:
107 raise PrivilegedIqError(exc.iq)
109 return resp["privilege"]["forwarded"]["iq"]
112XEP_0356.send_privileged_iq = send_privileged_iq # type:ignore
113Message.reply = reply # type: ignore
115slixmpp.plugins.PLUGINS.extend(
116 [
117 "xep_0292_provider",
118 ]
119)