Coverage for slidge/core/dispatcher/vcard.py: 89%
101 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1from copy import copy
3from slixmpp import CoroutineCallback, Iq, StanzaPath, register_stanza_plugin
4from slixmpp.exceptions import XMPPError
5from slixmpp.plugins.xep_0084 import MetaData
6from slixmpp.plugins.xep_0292.stanza import NS as VCard4NS
8from ...contact import LegacyContact
9from ...core.session import BaseSession
10from ...group import LegacyParticipant
11from ...util.types import Avatar
12from .util import DispatcherMixin, exceptions_to_xmpp_errors
15class VCardMixin(DispatcherMixin):
16 __slots__: list[str] = []
18 def __init__(self, xmpp) -> None:
19 super().__init__(xmpp)
20 xmpp.register_handler(
21 CoroutineCallback(
22 "get_vcard", StanzaPath("iq@type=get/vcard"), self.on_get_vcard
23 )
24 )
25 xmpp.remove_handler("VCardTemp")
26 xmpp.register_handler(
27 CoroutineCallback(
28 "VCardTemp",
29 StanzaPath("iq/vcard_temp"),
30 self.__vcard_temp_handler,
31 )
32 )
33 # TODO: MR to slixmpp adding this to XEP-0084
34 register_stanza_plugin(
35 self.xmpp.plugin["xep_0060"].stanza.Item,
36 MetaData,
37 )
39 @exceptions_to_xmpp_errors
40 async def on_get_vcard(self, iq: Iq) -> None:
41 session = await self._get_session(iq, logged=True)
42 contact = await session.contacts.by_jid(iq.get_to())
43 vcard = await contact.get_vcard()
44 reply = iq.reply()
45 if vcard:
46 reply.append(vcard)
47 else:
48 reply.enable("vcard")
49 reply.send()
51 @exceptions_to_xmpp_errors
52 async def __vcard_temp_handler(self, iq: Iq):
53 if iq["type"] == "get":
54 return await self.__handle_get_vcard_temp(iq)
56 if iq["type"] == "set":
57 return await self.__handle_set_vcard_temp(iq)
59 async def __fetch_user_avatar(self, session: BaseSession):
60 hash_ = session.user.avatar_hash
61 if not hash_:
62 raise XMPPError(
63 "item-not-found", "The slidge user does not have any avatar set"
64 )
65 meta_iq = await self.xmpp.plugin["xep_0060"].get_item(
66 session.user_jid,
67 MetaData.namespace,
68 hash_,
69 ifrom=self.xmpp.boundjid.bare,
70 )
71 info = meta_iq["pubsub"]["items"]["item"]["avatar_metadata"]["info"]
72 type_ = info["type"]
73 data_iq = await self.xmpp.plugin["xep_0084"].retrieve_avatar(
74 session.user_jid, hash_, ifrom=self.xmpp.boundjid.bare
75 )
76 bytes_ = data_iq["pubsub"]["items"]["item"]["avatar_data"]["value"]
77 return bytes_, type_
79 async def __handle_get_vcard_temp(self, iq: Iq):
80 session = self.xmpp.get_session_from_stanza(iq)
81 entity = await session.get_contact_or_group_or_participant(iq.get_to(), False)
82 if not entity:
83 raise XMPPError("item-not-found")
85 bytes_ = None
86 if isinstance(entity, LegacyParticipant):
87 if entity.is_user:
88 bytes_, type_ = await self.__fetch_user_avatar(session)
89 if not bytes_:
90 raise XMPPError(
91 "internal-server-error",
92 "Could not fetch the slidge user's avatar",
93 )
94 avatar = None
95 vcard = None
96 elif not (contact := entity.contact):
97 raise XMPPError("item-not-found", "This participant has no contact")
98 else:
99 vcard = await contact.get_vcard()
100 avatar = contact.get_avatar()
101 type_ = "image/png"
102 else:
103 avatar = entity.get_avatar()
104 type_ = "image/png"
105 if isinstance(entity, LegacyContact):
106 vcard = await entity.get_vcard(fetch=False)
107 else:
108 vcard = None
109 v = self.xmpp.plugin["xep_0054"].make_vcard()
110 if avatar is not None and avatar.data:
111 bytes_ = avatar.data.get_value()
112 if bytes_:
113 v["PHOTO"]["BINVAL"] = bytes_
114 v["PHOTO"]["TYPE"] = type_
115 if vcard:
116 for el in vcard.xml:
117 new = copy(el)
118 new.tag = el.tag.replace(f"{{{VCard4NS}}}", "")
119 v.append(new)
120 reply = iq.reply()
121 reply.append(v)
122 reply.send()
124 async def __handle_set_vcard_temp(self, iq: Iq):
125 muc = await self.get_muc_from_stanza(iq)
126 to = iq.get_to()
128 if to.resource:
129 raise XMPPError("bad-request", "You cannot set participants avatars")
131 data = iq["vcard_temp"]["PHOTO"]["BINVAL"] or None
132 try:
133 legacy_id = await muc.on_avatar(
134 data, iq["vcard_temp"]["PHOTO"]["TYPE"] or None
135 )
136 except XMPPError:
137 raise
138 except Exception as e:
139 raise XMPPError("internal-server-error", str(e))
140 reply = iq.reply(clear=True)
141 reply.enable("vcard_temp")
142 reply.send()
144 if not data:
145 await muc.set_avatar(None)
146 return
148 if legacy_id:
149 await muc.set_avatar(Avatar(data=data, unique_id=legacy_id))