Coverage for slidge / core / dispatcher / vcard.py: 89%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1from copy import copy
2from typing import TYPE_CHECKING
4from slixmpp import CoroutineCallback, Iq, StanzaPath, register_stanza_plugin
5from slixmpp.exceptions import XMPPError
6from slixmpp.plugins.xep_0084.stanza import MetaData
7from slixmpp.plugins.xep_0292.stanza import NS as VCard4NS
9from ...contact import LegacyContact
10from ...group import LegacyParticipant
11from ...util.types import AnySession, Avatar
12from .util import DispatcherMixin, exceptions_to_xmpp_errors
14if TYPE_CHECKING:
15 from slidge.core.gateway import BaseGateway
18class VCardMixin(DispatcherMixin):
19 __slots__: list[str] = []
21 def __init__(self, xmpp: "BaseGateway") -> None:
22 super().__init__(xmpp)
23 xmpp.register_handler(
24 CoroutineCallback(
25 "get_vcard", StanzaPath("iq@type=get/vcard"), self.on_get_vcard
26 )
27 )
28 xmpp.remove_handler("VCardTemp")
29 xmpp.register_handler(
30 CoroutineCallback(
31 "VCardTemp",
32 StanzaPath("iq/vcard_temp"),
33 self.__vcard_temp_handler,
34 )
35 )
36 # TODO: MR to slixmpp adding this to XEP-0084
37 register_stanza_plugin(
38 self.xmpp.plugin["xep_0060"].stanza.Item,
39 MetaData,
40 )
42 @exceptions_to_xmpp_errors
43 async def on_get_vcard(self, iq: Iq) -> None:
44 session = await self._get_session(iq, logged=True)
45 contact = await session.contacts.by_jid(iq.get_to())
46 vcard = await contact.get_vcard()
47 reply = iq.reply()
48 if vcard:
49 reply.append(vcard)
50 else:
51 reply.enable("vcard")
52 reply.send()
54 @exceptions_to_xmpp_errors
55 async def __vcard_temp_handler(self, iq: Iq) -> None:
56 if iq["type"] == "get":
57 return await self.__handle_get_vcard_temp(iq)
59 if iq["type"] == "set":
60 return await self.__handle_set_vcard_temp(iq)
62 async def __fetch_user_avatar(self, session: AnySession) -> tuple[bytes, str]:
63 hash_ = session.user.avatar_hash
64 if not hash_:
65 raise XMPPError(
66 "item-not-found", "The slidge user does not have any avatar set"
67 )
68 meta_iq = await self.xmpp.plugin["xep_0060"].get_item(
69 session.user_jid,
70 MetaData.namespace,
71 hash_,
72 ifrom=self.xmpp.boundjid.bare,
73 )
74 info = meta_iq["pubsub"]["items"]["item"]["avatar_metadata"]["info"]
75 type_ = info["type"]
76 data_iq = await self.xmpp.plugin["xep_0084"].retrieve_avatar(
77 session.user_jid, hash_, ifrom=self.xmpp.boundjid.bare
78 )
79 bytes_ = data_iq["pubsub"]["items"]["item"]["avatar_data"]["value"]
80 return bytes_, type_
82 async def __handle_get_vcard_temp(self, iq: Iq) -> None:
83 session = self.xmpp.get_session_from_stanza(iq)
84 entity = await session.get_contact_or_group_or_participant(iq.get_to(), False)
85 if not entity:
86 raise XMPPError("item-not-found")
88 bytes_ = None
89 if isinstance(entity, LegacyParticipant):
90 if entity.is_user:
91 bytes_, type_ = await self.__fetch_user_avatar(session)
92 if not bytes_:
93 raise XMPPError(
94 "internal-server-error",
95 "Could not fetch the slidge user's avatar",
96 )
97 avatar = None
98 vcard = None
99 elif not (contact := entity.contact):
100 raise XMPPError("item-not-found", "This participant has no contact")
101 else:
102 vcard = await contact.get_vcard()
103 avatar = contact.get_avatar()
104 type_ = "image/png"
105 else:
106 avatar = entity.get_avatar()
107 type_ = "image/png"
108 if isinstance(entity, LegacyContact):
109 vcard = await entity.get_vcard()
110 else:
111 vcard = None
112 v = self.xmpp.plugin["xep_0054"].make_vcard()
113 if avatar is not None and avatar.data:
114 bytes_ = avatar.data.get_value()
115 if bytes_:
116 v["PHOTO"]["BINVAL"] = bytes_
117 v["PHOTO"]["TYPE"] = type_
118 if vcard:
119 for el in vcard.xml:
120 new = copy(el)
121 new.tag = el.tag.replace(f"{{{VCard4NS}}}", "")
122 v.append(new)
123 reply = iq.reply()
124 reply.append(v)
125 reply.send()
127 async def __handle_set_vcard_temp(self, iq: Iq) -> None:
128 muc = await self.get_muc_from_stanza(iq)
129 to = iq.get_to()
131 if to.resource:
132 raise XMPPError("bad-request", "You cannot set participants avatars")
134 data = iq["vcard_temp"]["PHOTO"]["BINVAL"] or None
135 try:
136 legacy_id = await muc.on_avatar(
137 data, iq["vcard_temp"]["PHOTO"]["TYPE"] or None
138 )
139 except XMPPError:
140 raise
141 except Exception as e:
142 raise XMPPError("internal-server-error", str(e))
143 reply = iq.reply(clear=True)
144 reply.enable("vcard_temp")
145 reply.send()
147 if not data:
148 await muc.set_avatar(None)
149 return
151 if legacy_id:
152 await muc.set_avatar(Avatar(data=data, unique_id=legacy_id))