Coverage for slidge / slixfix / __init__.py: 88%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-13 22:59 +0000

1# This module contains patches for slixmpp; some have pending requests upstream 

2# and should be removed on the next slixmpp release. 

3import uuid 

4 

5# ruff: noqa: F401 

6import slixmpp.plugins 

7import slixmpp.stanza.roster 

8from slixmpp import Iq, Message 

9from slixmpp.exceptions import IqError 

10from slixmpp.plugins.xep_0050 import XEP_0050, Command 

11from slixmpp.plugins.xep_0356.permissions import IqPermission 

12from slixmpp.plugins.xep_0356.privilege import XEP_0356, PrivilegedIqError 

13from slixmpp.plugins.xep_0469.stanza import NS as PINNED_NS 

14from slixmpp.plugins.xep_0469.stanza import Pinned 

15from slixmpp.xmlstream import StanzaBase 

16 

17from ..util.archive_msg import set_client_namespace 

18from . import ( 

19 xep_0077, 

20 xep_0100, 

21 xep_0153, 

22 xep_0292, 

23) 

24 

25 

26def set_pinned(self, val: bool) -> None: 

27 extensions = self.parent() 

28 if val: 

29 extensions.enable("pinned") 

30 else: 

31 extensions._del_sub(f"{{{PINNED_NS}}}pinned") 

32 

33 

34Pinned.set_pinned = set_pinned 

35 

36 

37def session_bind(self, jid) -> None: 

38 self.xmpp["xep_0030"].add_feature(Command.namespace) 

39 # awful hack to for the disco items: we need to comment this line 

40 # related issue: https://todo.sr.ht/~nicoco/slidge/131 

41 # self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple()) 

42 

43 

44XEP_0050.session_bind = session_bind # type:ignore 

45 

46 

47def reply(self, body=None, clear: bool = True): 

48 """ 

49 Overrides slixmpp's Message.reply(), since it strips to sender's resource 

50 for mtype=groupchat, and we do not want that, because when we raise an XMPPError, 

51 we actually want to preserve the resource. 

52 (this is called in RootStanza.exception() to handle XMPPErrors) 

53 """ 

54 new_message = StanzaBase.reply(self, clear) 

55 new_message["thread"] = self["thread"] 

56 new_message["parent_thread"] = self["parent_thread"] 

57 

58 del new_message["id"] 

59 if self.stream is not None and self.stream.use_message_ids: 

60 new_message["id"] = self.stream.new_id() 

61 

62 if body is not None: 

63 new_message["body"] = body 

64 return new_message 

65 

66 

67async def send_privileged_iq(self, encapsulated_iq: Iq, iq_id: str | None = None) -> Iq: 

68 """ 

69 Send an IQ on behalf of a user 

70 

71 Caution: the IQ *must* have the jabber:client namespace 

72 

73 Raises :class:`PrivilegedIqError` on failure. 

74 """ 

75 iq_id = iq_id or str(uuid.uuid4()) 

76 encapsulated_iq["id"] = iq_id 

77 if encapsulated_iq.namespace != "jabber:client": 

78 set_client_namespace(encapsulated_iq) 

79 server = encapsulated_iq.get_from().domain 

80 perms = self.granted_privileges.get(server) 

81 if not perms: 

82 raise PermissionError(f"{server} has not granted us any privilege") 

83 itype = encapsulated_iq["type"] 

84 for ns in encapsulated_iq.plugins.values(): 

85 type_ = perms.iq[ns.namespace] 

86 if type_ == IqPermission.NONE: 

87 raise PermissionError( 

88 f"{server} has not granted any IQ privilege for namespace {ns.namespace}" 

89 ) 

90 elif type_ == IqPermission.BOTH: 

91 pass 

92 elif type_ != itype: 

93 raise PermissionError( 

94 f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}" 

95 ) 

96 iq = self.xmpp.make_iq( 

97 itype=itype, 

98 ifrom=self.xmpp.boundjid.bare, 

99 ito=encapsulated_iq.get_from(), 

100 id=iq_id, 

101 ) 

102 iq["privileged_iq"].append(encapsulated_iq) 

103 

104 try: 

105 resp = await iq.send() 

106 except IqError as exc: 

107 raise PrivilegedIqError(exc.iq) 

108 

109 return resp["privilege"]["forwarded"]["iq"] 

110 

111 

112XEP_0356.send_privileged_iq = send_privileged_iq # type:ignore 

113Message.reply = reply # type: ignore 

114 

115slixmpp.plugins.PLUGINS.extend( 

116 [ 

117 "xep_0292_provider", 

118 ] 

119)