Coverage for slidge/core/dispatcher/vcard.py: 89%

99 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1from copy import copy 

2 

3from slixmpp import CoroutineCallback, Iq, StanzaPath, register_stanza_plugin 

4from slixmpp.exceptions import XMPPError 

5from slixmpp.plugins.xep_0084 import MetaData 

6from slixmpp.plugins.xep_0292.stanza import NS as VCard4NS 

7 

8from ...contact import LegacyContact 

9from ...core.session import BaseSession 

10from ...group import LegacyParticipant 

11from .util import DispatcherMixin, exceptions_to_xmpp_errors 

12 

13 

14class VCardMixin(DispatcherMixin): 

15 def __init__(self, xmpp): 

16 super().__init__(xmpp) 

17 xmpp.register_handler( 

18 CoroutineCallback( 

19 "get_vcard", StanzaPath("iq@type=get/vcard"), self.on_get_vcard 

20 ) 

21 ) 

22 xmpp.remove_handler("VCardTemp") 

23 xmpp.register_handler( 

24 CoroutineCallback( 

25 "VCardTemp", 

26 StanzaPath("iq/vcard_temp"), 

27 self.__vcard_temp_handler, 

28 ) 

29 ) 

30 # TODO: MR to slixmpp adding this to XEP-0084 

31 register_stanza_plugin( 

32 self.xmpp.plugin["xep_0060"].stanza.Item, 

33 MetaData, 

34 ) 

35 

36 @exceptions_to_xmpp_errors 

37 async def on_get_vcard(self, iq: Iq): 

38 session = await self._get_session(iq, logged=True) 

39 contact = await session.contacts.by_jid(iq.get_to()) 

40 vcard = await contact.get_vcard() 

41 reply = iq.reply() 

42 if vcard: 

43 reply.append(vcard) 

44 else: 

45 reply.enable("vcard") 

46 reply.send() 

47 

48 @exceptions_to_xmpp_errors 

49 async def __vcard_temp_handler(self, iq: Iq): 

50 if iq["type"] == "get": 

51 return await self.__handle_get_vcard_temp(iq) 

52 

53 if iq["type"] == "set": 

54 return await self.__handle_set_vcard_temp(iq) 

55 

56 async def __fetch_user_avatar(self, session: BaseSession): 

57 hash_ = session.user.avatar_hash 

58 if not hash_: 

59 raise XMPPError( 

60 "item-not-found", "The slidge user does not have any avatar set" 

61 ) 

62 meta_iq = await self.xmpp.plugin["xep_0060"].get_item( 

63 session.user_jid, 

64 MetaData.namespace, 

65 hash_, 

66 ifrom=self.xmpp.boundjid.bare, 

67 ) 

68 info = meta_iq["pubsub"]["items"]["item"]["avatar_metadata"]["info"] 

69 type_ = info["type"] 

70 data_iq = await self.xmpp.plugin["xep_0084"].retrieve_avatar( 

71 session.user_jid, hash_, ifrom=self.xmpp.boundjid.bare 

72 ) 

73 bytes_ = data_iq["pubsub"]["items"]["item"]["avatar_data"]["value"] 

74 return bytes_, type_ 

75 

76 async def __handle_get_vcard_temp(self, iq: Iq): 

77 session = self.xmpp.get_session_from_stanza(iq) 

78 entity = await session.get_contact_or_group_or_participant(iq.get_to(), False) 

79 if not entity: 

80 raise XMPPError("item-not-found") 

81 

82 bytes_ = None 

83 if isinstance(entity, LegacyParticipant): 

84 if entity.is_user: 

85 bytes_, type_ = await self.__fetch_user_avatar(session) 

86 if not bytes_: 

87 raise XMPPError( 

88 "internal-server-error", 

89 "Could not fetch the slidge user's avatar", 

90 ) 

91 avatar = None 

92 vcard = None 

93 elif not (contact := entity.contact): 

94 raise XMPPError("item-not-found", "This participant has no contact") 

95 else: 

96 vcard = await contact.get_vcard() 

97 avatar = contact.get_avatar() 

98 type_ = "image/png" 

99 else: 

100 avatar = entity.get_avatar() 

101 type_ = "image/png" 

102 if isinstance(entity, LegacyContact): 

103 vcard = await entity.get_vcard(fetch=False) 

104 else: 

105 vcard = None 

106 v = self.xmpp.plugin["xep_0054"].make_vcard() 

107 if avatar is not None and avatar.data: 

108 bytes_ = avatar.data.get_value() 

109 if bytes_: 

110 v["PHOTO"]["BINVAL"] = bytes_ 

111 v["PHOTO"]["TYPE"] = type_ 

112 if vcard: 

113 for el in vcard.xml: 

114 new = copy(el) 

115 new.tag = el.tag.replace(f"{{{VCard4NS}}}", "") 

116 v.append(new) 

117 reply = iq.reply() 

118 reply.append(v) 

119 reply.send() 

120 

121 async def __handle_set_vcard_temp(self, iq: Iq): 

122 muc = await self.get_muc_from_stanza(iq) 

123 to = iq.get_to() 

124 

125 if to.resource: 

126 raise XMPPError("bad-request", "You cannot set participants avatars") 

127 

128 data = iq["vcard_temp"]["PHOTO"]["BINVAL"] or None 

129 try: 

130 legacy_id = await muc.on_avatar( 

131 data, iq["vcard_temp"]["PHOTO"]["TYPE"] or None 

132 ) 

133 except XMPPError: 

134 raise 

135 except Exception as e: 

136 raise XMPPError("internal-server-error", str(e)) 

137 reply = iq.reply(clear=True) 

138 reply.enable("vcard_temp") 

139 reply.send() 

140 

141 if not data: 

142 await muc.set_avatar(None, blocking=True) 

143 return 

144 

145 if legacy_id: 

146 await muc.set_avatar(data, legacy_id, blocking=True)