Coverage for slidge/slixfix/xep_0356_old/privilege.py: 79%
58 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import logging
2import typing
3from collections import defaultdict
5from slixmpp import JID, Iq, Message
6from slixmpp.plugins.base import BasePlugin
7from slixmpp.plugins.xep_0356.permissions import (
8 MessagePermission,
9 Permissions,
10 RosterAccess,
11)
12from slixmpp.types import JidStr
13from slixmpp.xmlstream import StanzaBase
14from slixmpp.xmlstream.handler import Callback
15from slixmpp.xmlstream.matcher import StanzaPath
17from . import stanza
19log = logging.getLogger(__name__)
22# noinspection PyPep8Naming
23class XEP_0356_OLD(BasePlugin):
24 """
25 XEP-0356: Privileged Entity
27 Events:
29 ::
31 privileges_advertised_old -- Received message/privilege from the server
32 """
34 name = "xep_0356_old"
35 description = "XEP-0356: Privileged Entity (slidge - old namespace)"
36 dependencies = {"xep_0297"}
37 stanza = stanza
39 granted_privileges: defaultdict[JidStr, Permissions] = defaultdict(Permissions)
41 def plugin_init(self):
42 if not self.xmpp.is_component:
43 log.error("XEP 0356 is only available for components")
44 return
46 stanza.register()
48 self.xmpp.register_handler(
49 Callback(
50 "Privileges_old",
51 StanzaPath("message/privilege_old"),
52 self._handle_privilege,
53 )
54 )
56 def plugin_end(self):
57 self.xmpp.remove_handler("Privileges_old")
59 def _handle_privilege(self, msg: StanzaBase):
60 """
61 Called when the XMPP server advertise the component's privileges.
63 Stores the privileges in this instance's granted_privileges attribute (a dict)
64 and raises the privileges_advertised event
65 """
66 for perm in msg["privilege_old"]["perms"]:
67 setattr(
68 self.granted_privileges[msg.get_from()], perm["access"], perm["type"]
69 )
70 log.debug(f"Privileges (old): {self.granted_privileges}")
71 self.xmpp.event("privileges_advertised_old")
73 def send_privileged_message(self, msg: Message):
74 if (
75 self.granted_privileges[msg.get_from().domain].message
76 != MessagePermission.OUTGOING
77 ):
78 raise PermissionError(
79 "The server hasn't authorized us to send messages on behalf of other users"
80 )
81 else:
82 self._make_privileged_message(msg).send()
84 def _make_privileged_message(self, msg: Message):
85 server = msg.get_from().domain
86 wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare)
87 wrapped["privilege_old"]["forwarded"].append(msg)
88 return wrapped
90 def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
91 return self.xmpp.make_iq_get(
92 queryxmlns="jabber:iq:roster",
93 ifrom=self.xmpp.boundjid.bare,
94 ito=jid,
95 **iq_kwargs,
96 )
98 def _make_set_roster(
99 self,
100 jid: typing.Union[JID, str],
101 roster_items: dict,
102 **iq_kwargs,
103 ):
104 iq = self.xmpp.make_iq_set(
105 ifrom=self.xmpp.boundjid.bare,
106 ito=jid,
107 **iq_kwargs,
108 )
109 iq["roster"]["items"] = roster_items
110 return iq
112 async def get_roster(self, jid: typing.Union[JID, str], **send_kwargs) -> Iq:
113 """
114 Return the roster of user on the server the component has privileged access to.
116 Raises ValueError if the server did not advertise the corresponding privileges
118 :param jid: user we want to fetch the roster from
119 """
120 if isinstance(jid, str):
121 jid = JID(jid)
122 if self.granted_privileges[jid.domain].roster not in (
123 RosterAccess.GET,
124 RosterAccess.BOTH,
125 ):
126 raise PermissionError(
127 "The server did not grant us privileges to get rosters"
128 )
129 else:
130 return await self._make_get_roster(jid).send(**send_kwargs)
132 async def set_roster(
133 self, jid: typing.Union[JID, str], roster_items: dict, **send_kwargs
134 ) -> Iq:
135 """
136 Return the roster of user on the server the component has privileged access to.
138 Raises ValueError if the server did not advertise the corresponding privileges
140 :param jid: user we want to add or modify roster items
141 :param roster_items: a dict containing the roster items' JIDs as keys and
142 nested dicts containing names, subscriptions and groups.
143 Example:
144 {
145 "friend1@example.com": {
146 "name": "Friend 1",
147 "subscription": "both",
148 "groups": ["group1", "group2"],
149 },
150 "friend2@example.com": {
151 "name": "Friend 2",
152 "subscription": "from",
153 "groups": ["group3"],
154 },
155 }
156 """
157 if isinstance(jid, str):
158 jid = JID(jid)
159 if self.granted_privileges[jid.domain].roster not in (
160 RosterAccess.GET,
161 RosterAccess.BOTH,
162 ):
163 raise PermissionError(
164 "The server did not grant us privileges to set rosters"
165 )
166 else:
167 return await self._make_set_roster(jid, roster_items).send(**send_kwargs)