Coverage for slidge / core / pubsub.py: 82%
210 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import logging
2from copy import copy
3from pathlib import Path
4from typing import TYPE_CHECKING, ClassVar
6from slixmpp import (
7 JID,
8 CoroutineCallback,
9 ElementBase,
10 Iq,
11 Presence,
12 StanzaPath,
13 register_stanza_plugin,
14)
15from slixmpp.exceptions import IqError, IqTimeout, XMPPError
16from slixmpp.plugins.base import BasePlugin, register_plugin
17from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item
18from slixmpp.plugins.xep_0084.stanza import Data as AvatarData
19from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata
20from slixmpp.plugins.xep_0172.stanza import UserNick
21from slixmpp.plugins.xep_0292.stanza import VCard4
22from slixmpp.types import JidStr, OptJidStr
24from slidge.util.types import AnyContact
26from ..db.avatar import CachedAvatar, avatar_cache
27from ..db.models import GatewayUser
28from ..util.lock import NamedLockMixin
30if TYPE_CHECKING:
31 from slidge.core.gateway import BaseGateway
34VCARD4_NAMESPACE = "urn:xmpp:vcard4"
37class PepAvatar:
38 def __init__(self) -> None:
39 self.metadata: AvatarMetadata | None = None
40 self.id: str | None = None
41 self._avatar_data_path: Path | None = None
43 @property
44 def data(self) -> AvatarData | None:
45 if self._avatar_data_path is None:
46 return None
47 data = AvatarData()
48 data.set_value(self._avatar_data_path.read_bytes())
49 return data
51 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None:
52 metadata = AvatarMetadata()
53 self.id = cached_avatar.hash
54 metadata.add_info(
55 id=cached_avatar.hash,
56 itype="image/png",
57 ibytes=cached_avatar.path.stat().st_size,
58 height=str(cached_avatar.height),
59 width=str(cached_avatar.width),
60 )
61 self.metadata = metadata
62 self._avatar_data_path = cached_avatar.path
65class PubSubComponent(NamedLockMixin, BasePlugin):
66 xmpp: "BaseGateway"
68 name = "pubsub"
69 description = "Pubsub component"
70 dependencies: ClassVar[set[str]] = {
71 "xep_0030",
72 "xep_0060",
73 "xep_0115",
74 "xep_0163",
75 }
76 default_config: ClassVar[dict[str, str | None]] = {"component_name": None}
77 component_name: str
79 def __init__(self, *a: object, **kw: object) -> None:
80 super().__init__(*a, **kw)
81 register_stanza_plugin(EventItem, UserNick)
83 def plugin_init(self) -> None:
84 self.xmpp.register_handler(
85 CoroutineCallback(
86 "pubsub_get_avatar_data",
87 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"),
88 self._get_avatar_data, # type:ignore
89 )
90 )
91 self.xmpp.register_handler(
92 CoroutineCallback(
93 "pubsub_get_avatar_metadata",
94 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"),
95 self._get_avatar_metadata, # type:ignore
96 )
97 )
98 self.xmpp.register_handler(
99 CoroutineCallback(
100 "pubsub_get_vcard",
101 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"),
102 self._get_vcard, # type:ignore
103 )
104 )
106 disco = self.xmpp.plugin["xep_0030"]
107 disco.add_identity("pubsub", "pep", self.component_name)
108 disco.add_identity("account", "registered", self.component_name)
109 disco.add_feature("http://jabber.org/protocol/pubsub#event")
110 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items")
111 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items")
113 async def __get_features(self, presence: Presence) -> list[str]:
114 from_ = presence.get_from()
115 ver_string = presence["caps"]["ver"]
116 if ver_string:
117 info = await self.xmpp.plugin["xep_0115"].get_caps(from_)
118 else:
119 info = None
120 if info is None:
121 async with self.lock(from_):
122 try:
123 iq = await self.xmpp.plugin["xep_0030"].get_info(from_)
124 except (IqError, IqTimeout):
125 log.debug("Could not get disco#info of %s, ignoring", from_)
126 return []
127 info = iq["disco_info"]
128 return info["features"] # type:ignore[no-any-return]
130 async def on_presence_available(
131 self, p: Presence, contact: AnyContact | None
132 ) -> None:
133 if p.get_plugin("muc_join", check=True) is not None:
134 log.debug("Ignoring MUC presence here")
135 return
137 to = p.get_to()
138 if to != self.xmpp.boundjid.bare:
139 # we don't want to push anything for contacts that are not in the user's roster
140 if contact is None or not contact.is_friend:
141 return
143 from_ = p.get_from()
144 features = await self.__get_features(p)
146 if AvatarMetadata.namespace + "+notify" in features:
147 try:
148 pep_avatar = await self._get_authorized_avatar(p, contact)
149 except XMPPError:
150 pass
151 else:
152 if pep_avatar.metadata is not None:
153 await self.__broadcast(
154 data=pep_avatar.metadata,
155 from_=p.get_to().bare,
156 to=from_,
157 id=pep_avatar.metadata["info"]["id"],
158 )
159 if UserNick.namespace + "+notify" in features:
160 try:
161 pep_nick = await self._get_authorized_nick(p, contact)
162 except XMPPError:
163 pass
164 else:
165 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_)
167 if contact is not None and VCARD4_NAMESPACE + "+notify" in features:
168 vcard = await contact.get_vcard()
169 if vcard is not None:
170 await self.broadcast_vcard_event(p.get_to(), from_, vcard)
172 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4) -> None:
173 item = Item()
174 item.namespace = VCARD4_NAMESPACE
175 item["id"] = "current"
176 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to)
177 # The vcard content should NOT be in this event according to the spec:
178 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
179 # but movim expects it to be here, and I guess it does not hurt
181 log.debug("Broadcast vcard4 event: %s", vcard)
182 await self.__broadcast(
183 data=vcard,
184 from_=JID(from_).bare,
185 to=to,
186 id="current",
187 node=VCARD4_NAMESPACE,
188 )
190 async def __get_contact(self, stanza: Iq | Presence) -> AnyContact:
191 session = self.xmpp.get_session_from_stanza(stanza)
192 return await session.contacts.by_jid(stanza.get_to()) # type:ignore[no-any-return]
194 async def _get_authorized_avatar(
195 self, stanza: Iq | Presence, contact: AnyContact | None = None
196 ) -> PepAvatar:
197 if stanza.get_to() == self.xmpp.boundjid.bare:
198 item = PepAvatar()
199 if self.xmpp.avatar is not None:
200 item.set_avatar_from_cache(self.xmpp.avatar)
201 return item
203 if contact is None:
204 contact = await self.__get_contact(stanza)
206 item = PepAvatar()
207 if contact.stored.avatar is not None:
208 stored = avatar_cache.get(contact.stored.avatar)
209 assert stored is not None
210 item.set_avatar_from_cache(stored)
211 return item
213 async def _get_authorized_nick(
214 self, stanza: Iq | Presence, contact: AnyContact | None = None
215 ) -> UserNick:
216 if stanza.get_to() == self.xmpp.boundjid.bare:
217 return get_user_nick(self.xmpp.COMPONENT_NAME)
219 if contact is None:
220 contact = await self.__get_contact(stanza)
222 if contact.name is not None:
223 return get_user_nick(contact.name)
224 else:
225 return UserNick()
227 def __reply_with(
228 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None
229 ) -> None:
230 requested_items = iq["pubsub"]["items"]
232 if len(requested_items) == 0:
233 self._reply_with_payload(iq, content, item_id)
234 else:
235 for item in requested_items:
236 if item["id"] == item_id:
237 self._reply_with_payload(iq, content, item_id)
238 return
239 else:
240 raise XMPPError("item-not-found")
242 async def _get_avatar_data(self, iq: Iq) -> None:
243 pep_avatar = await self._get_authorized_avatar(iq)
244 self.__reply_with(iq, pep_avatar.data, pep_avatar.id)
246 async def _get_avatar_metadata(self, iq: Iq) -> None:
247 pep_avatar = await self._get_authorized_avatar(iq)
248 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id)
250 async def _get_vcard(self, iq: Iq) -> None:
251 # this is not the proper way that clients should retrieve VCards, but
252 # gajim does it this way.
253 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
254 session = self.xmpp.get_session_from_stanza(iq)
255 contact = await session.contacts.by_jid(iq.get_to())
256 vcard = await contact.get_vcard()
257 if vcard is None:
258 raise XMPPError("item-not-found")
259 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE)
261 @staticmethod
262 def _reply_with_payload(
263 iq: Iq,
264 payload: AvatarMetadata | AvatarData | VCard4 | None,
265 id_: str | None,
266 namespace: str | None = None,
267 ) -> None:
268 result = iq.reply()
269 item = Item()
270 if payload:
271 item.set_payload(payload.xml)
272 item["id"] = id_
273 result["pubsub"]["items"]["node"] = (
274 namespace if namespace else payload.namespace
275 )
276 result["pubsub"]["items"].append(item)
277 result.send()
279 async def __broadcast(
280 self, data: ElementBase, from_: JidStr, to: OptJidStr = None, **kwargs: object
281 ) -> None:
282 from_ = JID(from_)
283 if from_ != self.xmpp.boundjid.bare and to is not None:
284 to = JID(to)
285 session = self.xmpp.get_session_from_jid(to)
286 if session is None:
287 return
288 await session.ready
290 item = EventItem()
291 if data:
292 item.set_payload(data.xml)
293 for k, v in kwargs.items():
294 item[k] = v
296 items = EventItems()
297 items.append(item)
298 items["node"] = kwargs.get("node") or data.namespace
300 event = Event()
301 event.append(items)
303 msg = self.xmpp.Message()
304 msg.set_type("headline")
305 msg.set_from(from_)
306 msg.append(event)
308 if to is None:
309 with self.xmpp.store.session() as orm:
310 for u in orm.query(GatewayUser).all():
311 new_msg = copy(msg)
312 new_msg.set_to(u.jid.bare)
313 new_msg.send()
314 else:
315 msg.set_to(to)
316 msg.send()
318 async def broadcast_avatar(
319 self, from_: JidStr, to: JidStr, cached_avatar: CachedAvatar | None
320 ) -> None:
321 if cached_avatar is None:
322 await self.__broadcast(AvatarMetadata(), from_, to)
323 else:
324 pep_avatar = PepAvatar()
325 pep_avatar.set_avatar_from_cache(cached_avatar)
326 assert pep_avatar.metadata is not None
327 await self.__broadcast(
328 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"]
329 )
331 def broadcast_nick(
332 self,
333 user_jid: JID,
334 jid: JidStr,
335 nick: str | None = None,
336 ) -> None:
337 jid = JID(jid)
338 nickname = get_user_nick(nick)
339 log.debug("New nickname: %s", nickname)
340 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare))
343def get_user_nick(nick: str | None = None) -> UserNick:
344 user_nick = UserNick()
345 if nick is not None:
346 user_nick["nick"] = nick
347 return user_nick
350log = logging.getLogger(__name__)
351register_plugin(PubSubComponent)