Coverage for slidge/slixfix/__init__.py: 87%
60 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1# This module contains patches for slixmpp; some have pending requests upstream
2# and should be removed on the next slixmpp release.
3import uuid
5# ruff: noqa: F401
6import slixmpp.plugins
7import slixmpp.stanza.roster
8from slixmpp import Iq, Message
9from slixmpp.exceptions import IqError
10from slixmpp.plugins.xep_0050 import XEP_0050, Command
11from slixmpp.plugins.xep_0356.permissions import IqPermission
12from slixmpp.plugins.xep_0356.privilege import XEP_0356, PrivilegedIqError
13from slixmpp.plugins.xep_0469.stanza import NS as PINNED_NS
14from slixmpp.plugins.xep_0469.stanza import Pinned
15from slixmpp.xmlstream import StanzaBase
17from ..util.archive_msg import set_client_namespace
18from . import (
19 link_preview,
20 xep_0077,
21 xep_0100,
22 xep_0153,
23 xep_0292,
24)
27def set_pinned(self, val: bool) -> None:
28 extensions = self.parent()
29 if val:
30 extensions.enable("pinned")
31 else:
32 extensions._del_sub(f"{{{PINNED_NS}}}pinned")
35Pinned.set_pinned = set_pinned
38def session_bind(self, jid) -> None:
39 self.xmpp["xep_0030"].add_feature(Command.namespace)
40 # awful hack to for the disco items: we need to comment this line
41 # related issue: https://todo.sr.ht/~nicoco/slidge/131
42 # self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())
45XEP_0050.session_bind = session_bind # type:ignore
48def reply(self, body=None, clear: bool = True):
49 """
50 Overrides slixmpp's Message.reply(), since it strips to sender's resource
51 for mtype=groupchat, and we do not want that, because when we raise an XMPPError,
52 we actually want to preserve the resource.
53 (this is called in RootStanza.exception() to handle XMPPErrors)
54 """
55 new_message = StanzaBase.reply(self, clear)
56 new_message["thread"] = self["thread"]
57 new_message["parent_thread"] = self["parent_thread"]
59 del new_message["id"]
60 if self.stream is not None and self.stream.use_message_ids:
61 new_message["id"] = self.stream.new_id()
63 if body is not None:
64 new_message["body"] = body
65 return new_message
68async def send_privileged_iq(self, encapsulated_iq: Iq, iq_id: str | None = None) -> Iq:
69 """
70 Send an IQ on behalf of a user
72 Caution: the IQ *must* have the jabber:client namespace
74 Raises :class:`PrivilegedIqError` on failure.
75 """
76 iq_id = iq_id or str(uuid.uuid4())
77 encapsulated_iq["id"] = iq_id
78 if encapsulated_iq.namespace != "jabber:client":
79 set_client_namespace(encapsulated_iq)
80 server = encapsulated_iq.get_from().domain
81 perms = self.granted_privileges.get(server)
82 if not perms:
83 raise PermissionError(f"{server} has not granted us any privilege")
84 itype = encapsulated_iq["type"]
85 for ns in encapsulated_iq.plugins.values():
86 type_ = perms.iq[ns.namespace]
87 if type_ == IqPermission.NONE:
88 raise PermissionError(
89 f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
90 )
91 elif type_ == IqPermission.BOTH:
92 pass
93 elif type_ != itype:
94 raise PermissionError(
95 f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
96 )
97 iq = self.xmpp.make_iq(
98 itype=itype,
99 ifrom=self.xmpp.boundjid.bare,
100 ito=encapsulated_iq.get_from(),
101 id=iq_id,
102 )
103 iq["privileged_iq"].append(encapsulated_iq)
105 try:
106 resp = await iq.send()
107 except IqError as exc:
108 raise PrivilegedIqError(exc.iq)
110 return resp["privilege"]["forwarded"]["iq"]
113XEP_0356.send_privileged_iq = send_privileged_iq # type:ignore
114Message.reply = reply # type: ignore
116slixmpp.plugins.PLUGINS.extend(
117 [
118 "link_preview",
119 "xep_0292_provider",
120 ]
121)