Coverage for slidge/core/dispatcher/vcard.py: 89%

101 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1from copy import copy 

2 

3from slixmpp import CoroutineCallback, Iq, StanzaPath, register_stanza_plugin 

4from slixmpp.exceptions import XMPPError 

5from slixmpp.plugins.xep_0084 import MetaData 

6from slixmpp.plugins.xep_0292.stanza import NS as VCard4NS 

7 

8from ...contact import LegacyContact 

9from ...core.session import BaseSession 

10from ...group import LegacyParticipant 

11from ...util.types import Avatar 

12from .util import DispatcherMixin, exceptions_to_xmpp_errors 

13 

14 

15class VCardMixin(DispatcherMixin): 

16 __slots__: list[str] = [] 

17 

18 def __init__(self, xmpp) -> None: 

19 super().__init__(xmpp) 

20 xmpp.register_handler( 

21 CoroutineCallback( 

22 "get_vcard", StanzaPath("iq@type=get/vcard"), self.on_get_vcard 

23 ) 

24 ) 

25 xmpp.remove_handler("VCardTemp") 

26 xmpp.register_handler( 

27 CoroutineCallback( 

28 "VCardTemp", 

29 StanzaPath("iq/vcard_temp"), 

30 self.__vcard_temp_handler, 

31 ) 

32 ) 

33 # TODO: MR to slixmpp adding this to XEP-0084 

34 register_stanza_plugin( 

35 self.xmpp.plugin["xep_0060"].stanza.Item, 

36 MetaData, 

37 ) 

38 

39 @exceptions_to_xmpp_errors 

40 async def on_get_vcard(self, iq: Iq) -> None: 

41 session = await self._get_session(iq, logged=True) 

42 contact = await session.contacts.by_jid(iq.get_to()) 

43 vcard = await contact.get_vcard() 

44 reply = iq.reply() 

45 if vcard: 

46 reply.append(vcard) 

47 else: 

48 reply.enable("vcard") 

49 reply.send() 

50 

51 @exceptions_to_xmpp_errors 

52 async def __vcard_temp_handler(self, iq: Iq): 

53 if iq["type"] == "get": 

54 return await self.__handle_get_vcard_temp(iq) 

55 

56 if iq["type"] == "set": 

57 return await self.__handle_set_vcard_temp(iq) 

58 

59 async def __fetch_user_avatar(self, session: BaseSession): 

60 hash_ = session.user.avatar_hash 

61 if not hash_: 

62 raise XMPPError( 

63 "item-not-found", "The slidge user does not have any avatar set" 

64 ) 

65 meta_iq = await self.xmpp.plugin["xep_0060"].get_item( 

66 session.user_jid, 

67 MetaData.namespace, 

68 hash_, 

69 ifrom=self.xmpp.boundjid.bare, 

70 ) 

71 info = meta_iq["pubsub"]["items"]["item"]["avatar_metadata"]["info"] 

72 type_ = info["type"] 

73 data_iq = await self.xmpp.plugin["xep_0084"].retrieve_avatar( 

74 session.user_jid, hash_, ifrom=self.xmpp.boundjid.bare 

75 ) 

76 bytes_ = data_iq["pubsub"]["items"]["item"]["avatar_data"]["value"] 

77 return bytes_, type_ 

78 

79 async def __handle_get_vcard_temp(self, iq: Iq): 

80 session = self.xmpp.get_session_from_stanza(iq) 

81 entity = await session.get_contact_or_group_or_participant(iq.get_to(), False) 

82 if not entity: 

83 raise XMPPError("item-not-found") 

84 

85 bytes_ = None 

86 if isinstance(entity, LegacyParticipant): 

87 if entity.is_user: 

88 bytes_, type_ = await self.__fetch_user_avatar(session) 

89 if not bytes_: 

90 raise XMPPError( 

91 "internal-server-error", 

92 "Could not fetch the slidge user's avatar", 

93 ) 

94 avatar = None 

95 vcard = None 

96 elif not (contact := entity.contact): 

97 raise XMPPError("item-not-found", "This participant has no contact") 

98 else: 

99 vcard = await contact.get_vcard() 

100 avatar = contact.get_avatar() 

101 type_ = "image/png" 

102 else: 

103 avatar = entity.get_avatar() 

104 type_ = "image/png" 

105 if isinstance(entity, LegacyContact): 

106 vcard = await entity.get_vcard(fetch=False) 

107 else: 

108 vcard = None 

109 v = self.xmpp.plugin["xep_0054"].make_vcard() 

110 if avatar is not None and avatar.data: 

111 bytes_ = avatar.data.get_value() 

112 if bytes_: 

113 v["PHOTO"]["BINVAL"] = bytes_ 

114 v["PHOTO"]["TYPE"] = type_ 

115 if vcard: 

116 for el in vcard.xml: 

117 new = copy(el) 

118 new.tag = el.tag.replace(f"{{{VCard4NS}}}", "") 

119 v.append(new) 

120 reply = iq.reply() 

121 reply.append(v) 

122 reply.send() 

123 

124 async def __handle_set_vcard_temp(self, iq: Iq): 

125 muc = await self.get_muc_from_stanza(iq) 

126 to = iq.get_to() 

127 

128 if to.resource: 

129 raise XMPPError("bad-request", "You cannot set participants avatars") 

130 

131 data = iq["vcard_temp"]["PHOTO"]["BINVAL"] or None 

132 try: 

133 legacy_id = await muc.on_avatar( 

134 data, iq["vcard_temp"]["PHOTO"]["TYPE"] or None 

135 ) 

136 except XMPPError: 

137 raise 

138 except Exception as e: 

139 raise XMPPError("internal-server-error", str(e)) 

140 reply = iq.reply(clear=True) 

141 reply.enable("vcard_temp") 

142 reply.send() 

143 

144 if not data: 

145 await muc.set_avatar(None) 

146 return 

147 

148 if legacy_id: 

149 await muc.set_avatar(Avatar(data=data, unique_id=legacy_id))