Coverage for slidge/slixfix/__init__.py: 87%

60 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1# This module contains patches for slixmpp; some have pending requests upstream 

2# and should be removed on the next slixmpp release. 

3import uuid 

4 

5# ruff: noqa: F401 

6import slixmpp.plugins 

7import slixmpp.stanza.roster 

8from slixmpp import Iq, Message 

9from slixmpp.exceptions import IqError 

10from slixmpp.plugins.xep_0050 import XEP_0050, Command 

11from slixmpp.plugins.xep_0356.permissions import IqPermission 

12from slixmpp.plugins.xep_0356.privilege import XEP_0356, PrivilegedIqError 

13from slixmpp.plugins.xep_0469.stanza import NS as PINNED_NS 

14from slixmpp.plugins.xep_0469.stanza import Pinned 

15from slixmpp.xmlstream import StanzaBase 

16 

17from ..util.archive_msg import set_client_namespace 

18from . import ( 

19 link_preview, 

20 xep_0077, 

21 xep_0100, 

22 xep_0153, 

23 xep_0292, 

24) 

25 

26 

27def set_pinned(self, val: bool) -> None: 

28 extensions = self.parent() 

29 if val: 

30 extensions.enable("pinned") 

31 else: 

32 extensions._del_sub(f"{{{PINNED_NS}}}pinned") 

33 

34 

35Pinned.set_pinned = set_pinned 

36 

37 

38def session_bind(self, jid) -> None: 

39 self.xmpp["xep_0030"].add_feature(Command.namespace) 

40 # awful hack to for the disco items: we need to comment this line 

41 # related issue: https://todo.sr.ht/~nicoco/slidge/131 

42 # self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple()) 

43 

44 

45XEP_0050.session_bind = session_bind # type:ignore 

46 

47 

48def reply(self, body=None, clear: bool = True): 

49 """ 

50 Overrides slixmpp's Message.reply(), since it strips to sender's resource 

51 for mtype=groupchat, and we do not want that, because when we raise an XMPPError, 

52 we actually want to preserve the resource. 

53 (this is called in RootStanza.exception() to handle XMPPErrors) 

54 """ 

55 new_message = StanzaBase.reply(self, clear) 

56 new_message["thread"] = self["thread"] 

57 new_message["parent_thread"] = self["parent_thread"] 

58 

59 del new_message["id"] 

60 if self.stream is not None and self.stream.use_message_ids: 

61 new_message["id"] = self.stream.new_id() 

62 

63 if body is not None: 

64 new_message["body"] = body 

65 return new_message 

66 

67 

68async def send_privileged_iq(self, encapsulated_iq: Iq, iq_id: str | None = None) -> Iq: 

69 """ 

70 Send an IQ on behalf of a user 

71 

72 Caution: the IQ *must* have the jabber:client namespace 

73 

74 Raises :class:`PrivilegedIqError` on failure. 

75 """ 

76 iq_id = iq_id or str(uuid.uuid4()) 

77 encapsulated_iq["id"] = iq_id 

78 if encapsulated_iq.namespace != "jabber:client": 

79 set_client_namespace(encapsulated_iq) 

80 server = encapsulated_iq.get_from().domain 

81 perms = self.granted_privileges.get(server) 

82 if not perms: 

83 raise PermissionError(f"{server} has not granted us any privilege") 

84 itype = encapsulated_iq["type"] 

85 for ns in encapsulated_iq.plugins.values(): 

86 type_ = perms.iq[ns.namespace] 

87 if type_ == IqPermission.NONE: 

88 raise PermissionError( 

89 f"{server} has not granted any IQ privilege for namespace {ns.namespace}" 

90 ) 

91 elif type_ == IqPermission.BOTH: 

92 pass 

93 elif type_ != itype: 

94 raise PermissionError( 

95 f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}" 

96 ) 

97 iq = self.xmpp.make_iq( 

98 itype=itype, 

99 ifrom=self.xmpp.boundjid.bare, 

100 ito=encapsulated_iq.get_from(), 

101 id=iq_id, 

102 ) 

103 iq["privileged_iq"].append(encapsulated_iq) 

104 

105 try: 

106 resp = await iq.send() 

107 except IqError as exc: 

108 raise PrivilegedIqError(exc.iq) 

109 

110 return resp["privilege"]["forwarded"]["iq"] 

111 

112 

113XEP_0356.send_privileged_iq = send_privileged_iq # type:ignore 

114Message.reply = reply # type: ignore 

115 

116slixmpp.plugins.PLUGINS.extend( 

117 [ 

118 "link_preview", 

119 "xep_0292_provider", 

120 ] 

121)