Coverage for slidge / core / dispatcher / vcard.py: 89%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1from copy import copy 

2from typing import TYPE_CHECKING 

3 

4from slixmpp import CoroutineCallback, Iq, StanzaPath, register_stanza_plugin 

5from slixmpp.exceptions import XMPPError 

6from slixmpp.plugins.xep_0084.stanza import MetaData 

7from slixmpp.plugins.xep_0292.stanza import NS as VCard4NS 

8 

9from ...contact import LegacyContact 

10from ...group import LegacyParticipant 

11from ...util.types import AnySession, Avatar 

12from .util import DispatcherMixin, exceptions_to_xmpp_errors 

13 

14if TYPE_CHECKING: 

15 from slidge.core.gateway import BaseGateway 

16 

17 

18class VCardMixin(DispatcherMixin): 

19 __slots__: list[str] = [] 

20 

21 def __init__(self, xmpp: "BaseGateway") -> None: 

22 super().__init__(xmpp) 

23 xmpp.register_handler( 

24 CoroutineCallback( 

25 "get_vcard", StanzaPath("iq@type=get/vcard"), self.on_get_vcard 

26 ) 

27 ) 

28 xmpp.remove_handler("VCardTemp") 

29 xmpp.register_handler( 

30 CoroutineCallback( 

31 "VCardTemp", 

32 StanzaPath("iq/vcard_temp"), 

33 self.__vcard_temp_handler, 

34 ) 

35 ) 

36 # TODO: MR to slixmpp adding this to XEP-0084 

37 register_stanza_plugin( 

38 self.xmpp.plugin["xep_0060"].stanza.Item, 

39 MetaData, 

40 ) 

41 

42 @exceptions_to_xmpp_errors 

43 async def on_get_vcard(self, iq: Iq) -> None: 

44 session = await self._get_session(iq, logged=True) 

45 contact = await session.contacts.by_jid(iq.get_to()) 

46 vcard = await contact.get_vcard() 

47 reply = iq.reply() 

48 if vcard: 

49 reply.append(vcard) 

50 else: 

51 reply.enable("vcard") 

52 reply.send() 

53 

54 @exceptions_to_xmpp_errors 

55 async def __vcard_temp_handler(self, iq: Iq) -> None: 

56 if iq["type"] == "get": 

57 return await self.__handle_get_vcard_temp(iq) 

58 

59 if iq["type"] == "set": 

60 return await self.__handle_set_vcard_temp(iq) 

61 

62 async def __fetch_user_avatar(self, session: AnySession) -> tuple[bytes, str]: 

63 hash_ = session.user.avatar_hash 

64 if not hash_: 

65 raise XMPPError( 

66 "item-not-found", "The slidge user does not have any avatar set" 

67 ) 

68 meta_iq = await self.xmpp.plugin["xep_0060"].get_item( 

69 session.user_jid, 

70 MetaData.namespace, 

71 hash_, 

72 ifrom=self.xmpp.boundjid.bare, 

73 ) 

74 info = meta_iq["pubsub"]["items"]["item"]["avatar_metadata"]["info"] 

75 type_ = info["type"] 

76 data_iq = await self.xmpp.plugin["xep_0084"].retrieve_avatar( 

77 session.user_jid, hash_, ifrom=self.xmpp.boundjid.bare 

78 ) 

79 bytes_ = data_iq["pubsub"]["items"]["item"]["avatar_data"]["value"] 

80 return bytes_, type_ 

81 

82 async def __handle_get_vcard_temp(self, iq: Iq) -> None: 

83 session = self.xmpp.get_session_from_stanza(iq) 

84 entity = await session.get_contact_or_group_or_participant(iq.get_to(), False) 

85 if not entity: 

86 raise XMPPError("item-not-found") 

87 

88 bytes_ = None 

89 if isinstance(entity, LegacyParticipant): 

90 if entity.is_user: 

91 bytes_, type_ = await self.__fetch_user_avatar(session) 

92 if not bytes_: 

93 raise XMPPError( 

94 "internal-server-error", 

95 "Could not fetch the slidge user's avatar", 

96 ) 

97 avatar = None 

98 vcard = None 

99 elif not (contact := entity.contact): 

100 raise XMPPError("item-not-found", "This participant has no contact") 

101 else: 

102 vcard = await contact.get_vcard() 

103 avatar = contact.get_avatar() 

104 type_ = "image/png" 

105 else: 

106 avatar = entity.get_avatar() 

107 type_ = "image/png" 

108 if isinstance(entity, LegacyContact): 

109 vcard = await entity.get_vcard() 

110 else: 

111 vcard = None 

112 v = self.xmpp.plugin["xep_0054"].make_vcard() 

113 if avatar is not None and avatar.data: 

114 bytes_ = avatar.data.get_value() 

115 if bytes_: 

116 v["PHOTO"]["BINVAL"] = bytes_ 

117 v["PHOTO"]["TYPE"] = type_ 

118 if vcard: 

119 for el in vcard.xml: 

120 new = copy(el) 

121 new.tag = el.tag.replace(f"{{{VCard4NS}}}", "") 

122 v.append(new) 

123 reply = iq.reply() 

124 reply.append(v) 

125 reply.send() 

126 

127 async def __handle_set_vcard_temp(self, iq: Iq) -> None: 

128 muc = await self.get_muc_from_stanza(iq) 

129 to = iq.get_to() 

130 

131 if to.resource: 

132 raise XMPPError("bad-request", "You cannot set participants avatars") 

133 

134 data = iq["vcard_temp"]["PHOTO"]["BINVAL"] or None 

135 try: 

136 legacy_id = await muc.on_avatar( 

137 data, iq["vcard_temp"]["PHOTO"]["TYPE"] or None 

138 ) 

139 except XMPPError: 

140 raise 

141 except Exception as e: 

142 raise XMPPError("internal-server-error", str(e)) 

143 reply = iq.reply(clear=True) 

144 reply.enable("vcard_temp") 

145 reply.send() 

146 

147 if not data: 

148 await muc.set_avatar(None) 

149 return 

150 

151 if legacy_id: 

152 await muc.set_avatar(Avatar(data=data, unique_id=legacy_id))