Coverage for slidge/core/dispatcher/vcard.py: 89%
99 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1from copy import copy
3from slixmpp import CoroutineCallback, Iq, StanzaPath, register_stanza_plugin
4from slixmpp.exceptions import XMPPError
5from slixmpp.plugins.xep_0084 import MetaData
6from slixmpp.plugins.xep_0292.stanza import NS as VCard4NS
8from ...contact import LegacyContact
9from ...core.session import BaseSession
10from ...group import LegacyParticipant
11from .util import DispatcherMixin, exceptions_to_xmpp_errors
14class VCardMixin(DispatcherMixin):
15 def __init__(self, xmpp):
16 super().__init__(xmpp)
17 xmpp.register_handler(
18 CoroutineCallback(
19 "get_vcard", StanzaPath("iq@type=get/vcard"), self.on_get_vcard
20 )
21 )
22 xmpp.remove_handler("VCardTemp")
23 xmpp.register_handler(
24 CoroutineCallback(
25 "VCardTemp",
26 StanzaPath("iq/vcard_temp"),
27 self.__vcard_temp_handler,
28 )
29 )
30 # TODO: MR to slixmpp adding this to XEP-0084
31 register_stanza_plugin(
32 self.xmpp.plugin["xep_0060"].stanza.Item,
33 MetaData,
34 )
36 @exceptions_to_xmpp_errors
37 async def on_get_vcard(self, iq: Iq):
38 session = await self._get_session(iq, logged=True)
39 contact = await session.contacts.by_jid(iq.get_to())
40 vcard = await contact.get_vcard()
41 reply = iq.reply()
42 if vcard:
43 reply.append(vcard)
44 else:
45 reply.enable("vcard")
46 reply.send()
48 @exceptions_to_xmpp_errors
49 async def __vcard_temp_handler(self, iq: Iq):
50 if iq["type"] == "get":
51 return await self.__handle_get_vcard_temp(iq)
53 if iq["type"] == "set":
54 return await self.__handle_set_vcard_temp(iq)
56 async def __fetch_user_avatar(self, session: BaseSession):
57 hash_ = session.user.avatar_hash
58 if not hash_:
59 raise XMPPError(
60 "item-not-found", "The slidge user does not have any avatar set"
61 )
62 meta_iq = await self.xmpp.plugin["xep_0060"].get_item(
63 session.user_jid,
64 MetaData.namespace,
65 hash_,
66 ifrom=self.xmpp.boundjid.bare,
67 )
68 info = meta_iq["pubsub"]["items"]["item"]["avatar_metadata"]["info"]
69 type_ = info["type"]
70 data_iq = await self.xmpp.plugin["xep_0084"].retrieve_avatar(
71 session.user_jid, hash_, ifrom=self.xmpp.boundjid.bare
72 )
73 bytes_ = data_iq["pubsub"]["items"]["item"]["avatar_data"]["value"]
74 return bytes_, type_
76 async def __handle_get_vcard_temp(self, iq: Iq):
77 session = self.xmpp.get_session_from_stanza(iq)
78 entity = await session.get_contact_or_group_or_participant(iq.get_to(), False)
79 if not entity:
80 raise XMPPError("item-not-found")
82 bytes_ = None
83 if isinstance(entity, LegacyParticipant):
84 if entity.is_user:
85 bytes_, type_ = await self.__fetch_user_avatar(session)
86 if not bytes_:
87 raise XMPPError(
88 "internal-server-error",
89 "Could not fetch the slidge user's avatar",
90 )
91 avatar = None
92 vcard = None
93 elif not (contact := entity.contact):
94 raise XMPPError("item-not-found", "This participant has no contact")
95 else:
96 vcard = await contact.get_vcard()
97 avatar = contact.get_avatar()
98 type_ = "image/png"
99 else:
100 avatar = entity.get_avatar()
101 type_ = "image/png"
102 if isinstance(entity, LegacyContact):
103 vcard = await entity.get_vcard(fetch=False)
104 else:
105 vcard = None
106 v = self.xmpp.plugin["xep_0054"].make_vcard()
107 if avatar is not None and avatar.data:
108 bytes_ = avatar.data.get_value()
109 if bytes_:
110 v["PHOTO"]["BINVAL"] = bytes_
111 v["PHOTO"]["TYPE"] = type_
112 if vcard:
113 for el in vcard.xml:
114 new = copy(el)
115 new.tag = el.tag.replace(f"{{{VCard4NS}}}", "")
116 v.append(new)
117 reply = iq.reply()
118 reply.append(v)
119 reply.send()
121 async def __handle_set_vcard_temp(self, iq: Iq):
122 muc = await self.get_muc_from_stanza(iq)
123 to = iq.get_to()
125 if to.resource:
126 raise XMPPError("bad-request", "You cannot set participants avatars")
128 data = iq["vcard_temp"]["PHOTO"]["BINVAL"] or None
129 try:
130 legacy_id = await muc.on_avatar(
131 data, iq["vcard_temp"]["PHOTO"]["TYPE"] or None
132 )
133 except XMPPError:
134 raise
135 except Exception as e:
136 raise XMPPError("internal-server-error", str(e))
137 reply = iq.reply(clear=True)
138 reply.enable("vcard_temp")
139 reply.send()
141 if not data:
142 await muc.set_avatar(None, blocking=True)
143 return
145 if legacy_id:
146 await muc.set_avatar(data, legacy_id, blocking=True)