Coverage for slidge/slixfix/xep_0356_old/privilege.py: 79%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import logging 

2import typing 

3from collections import defaultdict 

4 

5from slixmpp import JID, Iq, Message 

6from slixmpp.plugins.base import BasePlugin 

7from slixmpp.plugins.xep_0356.permissions import ( 

8 MessagePermission, 

9 Permissions, 

10 RosterAccess, 

11) 

12from slixmpp.types import JidStr 

13from slixmpp.xmlstream import StanzaBase 

14from slixmpp.xmlstream.handler import Callback 

15from slixmpp.xmlstream.matcher import StanzaPath 

16 

17from . import stanza 

18 

19log = logging.getLogger(__name__) 

20 

21 

22# noinspection PyPep8Naming 

23class XEP_0356_OLD(BasePlugin): 

24 """ 

25 XEP-0356: Privileged Entity 

26 

27 Events: 

28 

29 :: 

30 

31 privileges_advertised_old -- Received message/privilege from the server 

32 """ 

33 

34 name = "xep_0356_old" 

35 description = "XEP-0356: Privileged Entity (slidge - old namespace)" 

36 dependencies = {"xep_0297"} 

37 stanza = stanza 

38 

39 granted_privileges: defaultdict[JidStr, Permissions] = defaultdict(Permissions) 

40 

41 def plugin_init(self): 

42 if not self.xmpp.is_component: 

43 log.error("XEP 0356 is only available for components") 

44 return 

45 

46 stanza.register() 

47 

48 self.xmpp.register_handler( 

49 Callback( 

50 "Privileges_old", 

51 StanzaPath("message/privilege_old"), 

52 self._handle_privilege, 

53 ) 

54 ) 

55 

56 def plugin_end(self): 

57 self.xmpp.remove_handler("Privileges_old") 

58 

59 def _handle_privilege(self, msg: StanzaBase): 

60 """ 

61 Called when the XMPP server advertise the component's privileges. 

62 

63 Stores the privileges in this instance's granted_privileges attribute (a dict) 

64 and raises the privileges_advertised event 

65 """ 

66 for perm in msg["privilege_old"]["perms"]: 

67 setattr( 

68 self.granted_privileges[msg.get_from()], perm["access"], perm["type"] 

69 ) 

70 log.debug(f"Privileges (old): {self.granted_privileges}") 

71 self.xmpp.event("privileges_advertised_old") 

72 

73 def send_privileged_message(self, msg: Message): 

74 if ( 

75 self.granted_privileges[msg.get_from().domain].message 

76 != MessagePermission.OUTGOING 

77 ): 

78 raise PermissionError( 

79 "The server hasn't authorized us to send messages on behalf of other users" 

80 ) 

81 else: 

82 self._make_privileged_message(msg).send() 

83 

84 def _make_privileged_message(self, msg: Message): 

85 server = msg.get_from().domain 

86 wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare) 

87 wrapped["privilege_old"]["forwarded"].append(msg) 

88 return wrapped 

89 

90 def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs): 

91 return self.xmpp.make_iq_get( 

92 queryxmlns="jabber:iq:roster", 

93 ifrom=self.xmpp.boundjid.bare, 

94 ito=jid, 

95 **iq_kwargs, 

96 ) 

97 

98 def _make_set_roster( 

99 self, 

100 jid: typing.Union[JID, str], 

101 roster_items: dict, 

102 **iq_kwargs, 

103 ): 

104 iq = self.xmpp.make_iq_set( 

105 ifrom=self.xmpp.boundjid.bare, 

106 ito=jid, 

107 **iq_kwargs, 

108 ) 

109 iq["roster"]["items"] = roster_items 

110 return iq 

111 

112 async def get_roster(self, jid: typing.Union[JID, str], **send_kwargs) -> Iq: 

113 """ 

114 Return the roster of user on the server the component has privileged access to. 

115 

116 Raises ValueError if the server did not advertise the corresponding privileges 

117 

118 :param jid: user we want to fetch the roster from 

119 """ 

120 if isinstance(jid, str): 

121 jid = JID(jid) 

122 if self.granted_privileges[jid.domain].roster not in ( 

123 RosterAccess.GET, 

124 RosterAccess.BOTH, 

125 ): 

126 raise PermissionError( 

127 "The server did not grant us privileges to get rosters" 

128 ) 

129 else: 

130 return await self._make_get_roster(jid).send(**send_kwargs) 

131 

132 async def set_roster( 

133 self, jid: typing.Union[JID, str], roster_items: dict, **send_kwargs 

134 ) -> Iq: 

135 """ 

136 Return the roster of user on the server the component has privileged access to. 

137 

138 Raises ValueError if the server did not advertise the corresponding privileges 

139 

140 :param jid: user we want to add or modify roster items 

141 :param roster_items: a dict containing the roster items' JIDs as keys and 

142 nested dicts containing names, subscriptions and groups. 

143 Example: 

144 { 

145 "friend1@example.com": { 

146 "name": "Friend 1", 

147 "subscription": "both", 

148 "groups": ["group1", "group2"], 

149 }, 

150 "friend2@example.com": { 

151 "name": "Friend 2", 

152 "subscription": "from", 

153 "groups": ["group3"], 

154 }, 

155 } 

156 """ 

157 if isinstance(jid, str): 

158 jid = JID(jid) 

159 if self.granted_privileges[jid.domain].roster not in ( 

160 RosterAccess.GET, 

161 RosterAccess.BOTH, 

162 ): 

163 raise PermissionError( 

164 "The server did not grant us privileges to set rosters" 

165 ) 

166 else: 

167 return await self._make_set_roster(jid, roster_items).send(**send_kwargs)