Coverage for slidge / core / pubsub.py: 84%
242 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import logging
2from copy import copy
3from pathlib import Path
4from typing import ClassVar
6from slixmpp import (
7 JID,
8 CoroutineCallback,
9 ElementBase,
10 Iq,
11 Presence,
12 StanzaPath,
13 register_stanza_plugin,
14)
15from slixmpp.exceptions import IqError, IqTimeout, XMPPError
16from slixmpp.plugins.base import BasePlugin, register_plugin
17from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item
18from slixmpp.plugins.xep_0084.stanza import Data as AvatarData
19from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata
20from slixmpp.plugins.xep_0172.stanza import UserNick
21from slixmpp.plugins.xep_0292.stanza import VCard4
22from slixmpp.plugins.xep_0402.stanza import Conference
23from slixmpp.types import JidStr, OptJidStr
25from ..contact import LegacyContact
26from ..db.avatar import CachedAvatar, avatar_cache
27from ..db.models import GatewayUser, Space
28from ..util.lock import NamedLockMixin
29from ..util.types import AnyGateway, AnySession
31VCARD4_NAMESPACE = "urn:xmpp:vcard4"
34class PepAvatar:
35 def __init__(self) -> None:
36 self.metadata: AvatarMetadata | None = None
37 self.id: str | None = None
38 self._avatar_data_path: Path | None = None
40 @property
41 def data(self) -> AvatarData | None:
42 if self._avatar_data_path is None:
43 return None
44 data = AvatarData()
45 data.set_value(self._avatar_data_path.read_bytes())
46 return data
48 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None:
49 metadata = AvatarMetadata()
50 self.id = cached_avatar.hash
51 metadata.add_info(
52 id=cached_avatar.hash,
53 itype="image/png",
54 ibytes=cached_avatar.path.stat().st_size,
55 height=str(cached_avatar.height),
56 width=str(cached_avatar.width),
57 )
58 self.metadata = metadata
59 self._avatar_data_path = cached_avatar.path
62class PubSubComponent(NamedLockMixin, BasePlugin):
63 xmpp: "AnyGateway"
65 name = "pubsub"
66 description = "Pubsub component"
67 dependencies: ClassVar[set[str]] = {
68 "xep_0030",
69 "xep_0060",
70 "xep_0115",
71 "xep_0163",
72 }
73 default_config: ClassVar[dict[str, str | None]] = {"component_name": None}
74 component_name: str
76 def __init__(self, *a: object, **kw: object) -> None:
77 super().__init__(*a, **kw)
78 # slixmpp usually does this via self.xmpp["xep_0163"].register_pep(),
79 # in the session_bind hook of plugins. However, for components,
80 # session_bind is never called.
81 register_stanza_plugin(EventItem, UserNick)
82 register_stanza_plugin(EventItem, Conference)
84 def plugin_init(self) -> None:
85 self.xmpp.register_handler(
86 CoroutineCallback(
87 "pubsub_get_avatar_data",
88 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"),
89 self._get_avatar_data, # type:ignore
90 )
91 )
92 self.xmpp.register_handler(
93 CoroutineCallback(
94 "pubsub_get_avatar_metadata",
95 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"),
96 self._get_avatar_metadata, # type:ignore
97 )
98 )
99 self.xmpp.register_handler(
100 CoroutineCallback(
101 "pubsub_get_vcard",
102 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"),
103 self._get_vcard, # type:ignore
104 )
105 )
107 disco = self.xmpp.plugin["xep_0030"]
108 disco.add_identity("pubsub", "pep", self.component_name)
109 disco.add_identity("account", "registered", self.component_name)
110 disco.add_feature("http://jabber.org/protocol/pubsub#event")
111 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items")
112 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items")
114 async def __get_features(self, presence: Presence) -> list[str]:
115 from_ = presence.get_from()
116 ver_string = presence["caps"]["ver"]
117 if ver_string:
118 info = await self.xmpp.plugin["xep_0115"].get_caps(from_)
119 else:
120 info = None
121 if info is None:
122 async with self.lock(from_):
123 try:
124 iq = await self.xmpp.plugin["xep_0030"].get_info(from_)
125 except (IqError, IqTimeout):
126 log.debug("Could not get disco#info of %s, ignoring", from_)
127 return []
128 info = iq["disco_info"]
129 return info["features"] # type:ignore[no-any-return]
131 async def on_presence_available(
132 self, p: Presence, contact: LegacyContact | None
133 ) -> None:
134 if "muc_join" in p:
135 log.debug("Ignoring MUC presence here")
136 return
138 to = p.get_to()
139 # we don't want to push anything for contacts that are not in the user's roster
140 if to != self.xmpp.boundjid.bare and (contact is None or not contact.is_friend):
141 return
143 from_ = p.get_from()
144 features = await self.__get_features(p)
146 if AvatarMetadata.namespace + "+notify" in features:
147 try:
148 pep_avatar = await self._get_authorized_avatar(p, contact)
149 except XMPPError:
150 pass
151 else:
152 if pep_avatar.metadata is not None:
153 await self.__broadcast(
154 data=pep_avatar.metadata,
155 from_=p.get_to().bare,
156 to=from_,
157 id=pep_avatar.metadata["info"]["id"],
158 )
159 if UserNick.namespace + "+notify" in features:
160 try:
161 pep_nick = await self._get_authorized_nick(p, contact)
162 except XMPPError:
163 pass
164 else:
165 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_)
167 if contact is not None and VCARD4_NAMESPACE + "+notify" in features:
168 vcard = await contact.get_vcard()
169 if vcard is not None:
170 await self.broadcast_vcard_event(p.get_to(), from_, vcard)
172 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4) -> None:
173 item = Item()
174 item.namespace = VCARD4_NAMESPACE
175 item["id"] = "current"
176 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to)
177 # The vcard content should NOT be in this event according to the spec:
178 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
179 # but movim expects it to be here, and I guess it does not hurt
181 log.debug("Broadcast vcard4 event: %s", vcard)
182 await self.__broadcast(
183 data=vcard,
184 from_=JID(from_).bare,
185 to=to,
186 id="current",
187 node=VCARD4_NAMESPACE,
188 )
190 async def __get_contact(self, stanza: Iq | Presence) -> LegacyContact:
191 session = self.xmpp.get_session_from_stanza(stanza)
192 return await session.contacts.by_jid(stanza.get_to()) # type:ignore[no-any-return]
194 async def _get_authorized_avatar(
195 self, stanza: Iq | Presence, contact: LegacyContact | None = None
196 ) -> PepAvatar:
197 if stanza.get_to() == self.xmpp.boundjid.bare:
198 item = PepAvatar()
199 if self.xmpp.avatar is not None:
200 item.set_avatar_from_cache(self.xmpp.avatar)
201 return item
203 if contact is None:
204 contact = await self.__get_contact(stanza)
206 item = PepAvatar()
207 if contact.stored.avatar is not None:
208 stored = avatar_cache.get(contact.stored.avatar)
209 assert stored is not None
210 item.set_avatar_from_cache(stored)
211 return item
213 async def _get_authorized_nick(
214 self, stanza: Iq | Presence, contact: LegacyContact | None = None
215 ) -> UserNick:
216 if stanza.get_to() == self.xmpp.boundjid.bare:
217 return get_user_nick(self.xmpp.COMPONENT_NAME)
219 if contact is None:
220 contact = await self.__get_contact(stanza)
222 if contact.name is not None:
223 return get_user_nick(contact.name)
224 else:
225 return UserNick()
227 def __reply_with(
228 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None
229 ) -> None:
230 requested_items = iq["pubsub"]["items"]
232 if len(requested_items) == 0:
233 self._reply_with_payload(iq, content, item_id)
234 else:
235 for item in requested_items:
236 if item["id"] == item_id:
237 self._reply_with_payload(iq, content, item_id)
238 return
239 else:
240 raise XMPPError("item-not-found")
242 async def _get_avatar_data(self, iq: Iq) -> None:
243 pep_avatar = await self._get_authorized_avatar(iq)
244 self.__reply_with(iq, pep_avatar.data, pep_avatar.id)
246 async def _get_avatar_metadata(self, iq: Iq) -> None:
247 pep_avatar = await self._get_authorized_avatar(iq)
248 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id)
250 async def _get_vcard(self, iq: Iq) -> None:
251 # this is not the proper way that clients should retrieve VCards, but
252 # gajim does it this way.
253 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
254 session = self.xmpp.get_session_from_stanza(iq)
255 contact = await session.contacts.by_jid(iq.get_to())
256 vcard = await contact.get_vcard()
257 if vcard is None:
258 raise XMPPError("item-not-found")
259 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE)
261 @staticmethod
262 def _reply_with_payload(
263 iq: Iq,
264 payload: AvatarMetadata | AvatarData | VCard4 | None,
265 id_: str | None,
266 namespace: str | None = None,
267 ) -> None:
268 result = iq.reply()
269 item = Item()
270 if payload:
271 item.set_payload(payload.xml)
272 item["id"] = id_
273 result["pubsub"]["items"]["node"] = (
274 namespace if namespace else payload.namespace
275 )
276 result["pubsub"]["items"].append(item)
277 result.send()
279 async def __broadcast(
280 self,
281 data: ElementBase | None,
282 from_: JidStr,
283 to: OptJidStr = None,
284 items: EventItems | None = None,
285 **kwargs: object,
286 ) -> None:
287 from_ = JID(from_)
288 if from_ != self.xmpp.boundjid.bare and to is not None:
289 to = JID(to)
290 session = self.xmpp.get_session_from_jid(to)
291 if session is None:
292 return
293 await session.ready
295 item = EventItem()
296 if data:
297 item.set_payload(data.xml)
298 for k, v in kwargs.items():
299 item[k] = v
301 if items is None:
302 items = EventItems()
303 items.append(item)
304 assert data is not None
305 items["node"] = kwargs.get("node") or data.namespace
307 event = Event()
308 event.append(items)
310 msg = self.xmpp.Message()
311 msg.set_type("headline")
312 msg.set_from(from_)
313 msg.append(event)
315 if to is None:
316 with self.xmpp.store.session() as orm:
317 for u in orm.query(GatewayUser).all():
318 new_msg = copy(msg)
319 new_msg.set_to(u.jid.bare)
320 new_msg.send()
321 else:
322 msg.set_to(to)
323 msg.send()
325 async def broadcast_avatar(
326 self, from_: JidStr, to: JidStr, cached_avatar: CachedAvatar | None
327 ) -> None:
328 if cached_avatar is None:
329 await self.__broadcast(AvatarMetadata(), from_, to)
330 else:
331 pep_avatar = PepAvatar()
332 pep_avatar.set_avatar_from_cache(cached_avatar)
333 assert pep_avatar.metadata is not None
334 await self.__broadcast(
335 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"]
336 )
338 def broadcast_nick(
339 self,
340 user_jid: JID,
341 jid: JidStr,
342 nick: str | None = None,
343 ) -> None:
344 jid = JID(jid)
345 nickname = get_user_nick(nick)
346 log.debug("New nickname: %s", nickname)
347 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare))
349 def broadcast_space(self, session: AnySession, space: Space) -> None:
350 items = EventItems()
351 items["node"] = str(space.legacy_id)
352 for room in space.rooms:
353 item = EventItem()
354 item["id"] = room.jid
355 item.enable("conference")
356 if room.name:
357 item["conference"]["name"] = room.name
358 items.append(item)
359 session.create_task(
360 self.__broadcast(
361 None,
362 self.xmpp.boundjid.bare,
363 session.user_jid,
364 items=items,
365 )
366 )
368 def broadcast_space_metadata(
369 self, session: AnySession, space: Space, node: str
370 ) -> None:
371 msg = self.xmpp.make_message(
372 mtype="headline", mto=session.user_jid, mfrom=self.xmpp.boundjid.bare
373 )
374 msg["pubsub_event"]["configuration"]["node"] = node
376 form = msg["pubsub_event"]["configuration"]["form"]
377 form["type"] = "result"
379 form.add_field(
380 var="FORM_TYPE",
381 ftype="hidden",
382 value="http://jabber.org/protocol/pubsub#node_config",
383 )
384 form.add_field(var="pubsub#title", value=space.name)
386 # MUSTs
387 form.add_field(var="pubsub#type", value="urn:xmpp:spaces:0")
388 form.add_field(var="pubsub#notify_retract", value="true")
389 form.add_field(var="pubsub#persist_items", value="true")
390 form.add_field(var="pubsub#purge_offline", value="false")
391 # SHOULDs
392 form.add_field(var="pubsub#notify_sub", value="true")
393 form.add_field(var="pubsub#notify_config", value="true")
394 form.add_field(var="pubsub#notify_delete", value="true")
395 form.add_field(var="pubsub#publish_model", value="publishers")
396 # MUST (private spaces)
397 form.add_field(var="pubsub#access_model", value="authorize")
399 msg.send()
402def get_user_nick(nick: str | None = None) -> UserNick:
403 user_nick = UserNick()
404 if nick is not None:
405 user_nick["nick"] = nick
406 return user_nick
409log = logging.getLogger(__name__)
410register_plugin(PubSubComponent)