Coverage for slidge / core / pubsub.py: 84%
241 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import logging
2from copy import copy
3from pathlib import Path
4from typing import TYPE_CHECKING, ClassVar
6from slixmpp import (
7 JID,
8 CoroutineCallback,
9 ElementBase,
10 Iq,
11 Presence,
12 StanzaPath,
13 register_stanza_plugin,
14)
15from slixmpp.exceptions import IqError, IqTimeout, XMPPError
16from slixmpp.plugins.base import BasePlugin, register_plugin
17from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item
18from slixmpp.plugins.xep_0084.stanza import Data as AvatarData
19from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata
20from slixmpp.plugins.xep_0172.stanza import UserNick
21from slixmpp.plugins.xep_0292.stanza import VCard4
22from slixmpp.plugins.xep_0402.stanza import Conference
23from slixmpp.types import JidStr, OptJidStr
25from slidge.util.types import AnyContact, AnySession
27from ..db.avatar import CachedAvatar, avatar_cache
28from ..db.models import GatewayUser, Space
29from ..util.lock import NamedLockMixin
31if TYPE_CHECKING:
32 from slidge.util.types import AnyGateway
35VCARD4_NAMESPACE = "urn:xmpp:vcard4"
38class PepAvatar:
39 def __init__(self) -> None:
40 self.metadata: AvatarMetadata | None = None
41 self.id: str | None = None
42 self._avatar_data_path: Path | None = None
44 @property
45 def data(self) -> AvatarData | None:
46 if self._avatar_data_path is None:
47 return None
48 data = AvatarData()
49 data.set_value(self._avatar_data_path.read_bytes())
50 return data
52 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None:
53 metadata = AvatarMetadata()
54 self.id = cached_avatar.hash
55 metadata.add_info(
56 id=cached_avatar.hash,
57 itype="image/png",
58 ibytes=cached_avatar.path.stat().st_size,
59 height=str(cached_avatar.height),
60 width=str(cached_avatar.width),
61 )
62 self.metadata = metadata
63 self._avatar_data_path = cached_avatar.path
66class PubSubComponent(NamedLockMixin, BasePlugin):
67 xmpp: "AnyGateway"
69 name = "pubsub"
70 description = "Pubsub component"
71 dependencies: ClassVar[set[str]] = {
72 "xep_0030",
73 "xep_0060",
74 "xep_0115",
75 "xep_0163",
76 }
77 default_config: ClassVar[dict[str, str | None]] = {"component_name": None}
78 component_name: str
80 def __init__(self, *a: object, **kw: object) -> None:
81 super().__init__(*a, **kw)
82 # slixmpp usually does this via self.xmpp["xep_0163"].register_pep(),
83 # in the session_bind hook of plugins. However, for components,
84 # session_bind is never called.
85 register_stanza_plugin(EventItem, UserNick)
86 register_stanza_plugin(EventItem, Conference)
88 def plugin_init(self) -> None:
89 self.xmpp.register_handler(
90 CoroutineCallback(
91 "pubsub_get_avatar_data",
92 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"),
93 self._get_avatar_data, # type:ignore
94 )
95 )
96 self.xmpp.register_handler(
97 CoroutineCallback(
98 "pubsub_get_avatar_metadata",
99 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"),
100 self._get_avatar_metadata, # type:ignore
101 )
102 )
103 self.xmpp.register_handler(
104 CoroutineCallback(
105 "pubsub_get_vcard",
106 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"),
107 self._get_vcard, # type:ignore
108 )
109 )
111 disco = self.xmpp.plugin["xep_0030"]
112 disco.add_identity("pubsub", "pep", self.component_name)
113 disco.add_identity("account", "registered", self.component_name)
114 disco.add_feature("http://jabber.org/protocol/pubsub#event")
115 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items")
116 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items")
118 async def __get_features(self, presence: Presence) -> list[str]:
119 from_ = presence.get_from()
120 ver_string = presence["caps"]["ver"]
121 if ver_string:
122 info = await self.xmpp.plugin["xep_0115"].get_caps(from_)
123 else:
124 info = None
125 if info is None:
126 async with self.lock(from_):
127 try:
128 iq = await self.xmpp.plugin["xep_0030"].get_info(from_)
129 except (IqError, IqTimeout):
130 log.debug("Could not get disco#info of %s, ignoring", from_)
131 return []
132 info = iq["disco_info"]
133 return info["features"] # type:ignore[no-any-return]
135 async def on_presence_available(
136 self, p: Presence, contact: AnyContact | None
137 ) -> None:
138 if "muc_join" in p:
139 log.debug("Ignoring MUC presence here")
140 return
142 to = p.get_to()
143 # we don't want to push anything for contacts that are not in the user's roster
144 if to != self.xmpp.boundjid.bare and (contact is None or not contact.is_friend):
145 return
147 from_ = p.get_from()
148 features = await self.__get_features(p)
150 if AvatarMetadata.namespace + "+notify" in features:
151 try:
152 pep_avatar = await self._get_authorized_avatar(p, contact)
153 except XMPPError:
154 pass
155 else:
156 if pep_avatar.metadata is not None:
157 await self.__broadcast(
158 data=pep_avatar.metadata,
159 from_=p.get_to().bare,
160 to=from_,
161 id=pep_avatar.metadata["info"]["id"],
162 )
163 if UserNick.namespace + "+notify" in features:
164 try:
165 pep_nick = await self._get_authorized_nick(p, contact)
166 except XMPPError:
167 pass
168 else:
169 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_)
171 if contact is not None and VCARD4_NAMESPACE + "+notify" in features:
172 vcard = await contact.get_vcard()
173 if vcard is not None:
174 await self.broadcast_vcard_event(p.get_to(), from_, vcard)
176 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4) -> None:
177 item = Item()
178 item.namespace = VCARD4_NAMESPACE
179 item["id"] = "current"
180 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to)
181 # The vcard content should NOT be in this event according to the spec:
182 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
183 # but movim expects it to be here, and I guess it does not hurt
185 log.debug("Broadcast vcard4 event: %s", vcard)
186 await self.__broadcast(
187 data=vcard,
188 from_=JID(from_).bare,
189 to=to,
190 id="current",
191 node=VCARD4_NAMESPACE,
192 )
194 async def __get_contact(self, stanza: Iq | Presence) -> AnyContact:
195 session = self.xmpp.get_session_from_stanza(stanza)
196 return await session.contacts.by_jid(stanza.get_to()) # type:ignore[no-any-return]
198 async def _get_authorized_avatar(
199 self, stanza: Iq | Presence, contact: AnyContact | None = None
200 ) -> PepAvatar:
201 if stanza.get_to() == self.xmpp.boundjid.bare:
202 item = PepAvatar()
203 if self.xmpp.avatar is not None:
204 item.set_avatar_from_cache(self.xmpp.avatar)
205 return item
207 if contact is None:
208 contact = await self.__get_contact(stanza)
210 item = PepAvatar()
211 if contact.stored.avatar is not None:
212 stored = avatar_cache.get(contact.stored.avatar)
213 assert stored is not None
214 item.set_avatar_from_cache(stored)
215 return item
217 async def _get_authorized_nick(
218 self, stanza: Iq | Presence, contact: AnyContact | None = None
219 ) -> UserNick:
220 if stanza.get_to() == self.xmpp.boundjid.bare:
221 return get_user_nick(self.xmpp.COMPONENT_NAME)
223 if contact is None:
224 contact = await self.__get_contact(stanza)
226 if contact.name is not None:
227 return get_user_nick(contact.name)
228 else:
229 return UserNick()
231 def __reply_with(
232 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None
233 ) -> None:
234 requested_items = iq["pubsub"]["items"]
236 if len(requested_items) == 0:
237 self._reply_with_payload(iq, content, item_id)
238 else:
239 for item in requested_items:
240 if item["id"] == item_id:
241 self._reply_with_payload(iq, content, item_id)
242 return
243 else:
244 raise XMPPError("item-not-found")
246 async def _get_avatar_data(self, iq: Iq) -> None:
247 pep_avatar = await self._get_authorized_avatar(iq)
248 self.__reply_with(iq, pep_avatar.data, pep_avatar.id)
250 async def _get_avatar_metadata(self, iq: Iq) -> None:
251 pep_avatar = await self._get_authorized_avatar(iq)
252 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id)
254 async def _get_vcard(self, iq: Iq) -> None:
255 # this is not the proper way that clients should retrieve VCards, but
256 # gajim does it this way.
257 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
258 session = self.xmpp.get_session_from_stanza(iq)
259 contact = await session.contacts.by_jid(iq.get_to())
260 vcard = await contact.get_vcard()
261 if vcard is None:
262 raise XMPPError("item-not-found")
263 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE)
265 @staticmethod
266 def _reply_with_payload(
267 iq: Iq,
268 payload: AvatarMetadata | AvatarData | VCard4 | None,
269 id_: str | None,
270 namespace: str | None = None,
271 ) -> None:
272 result = iq.reply()
273 item = Item()
274 if payload:
275 item.set_payload(payload.xml)
276 item["id"] = id_
277 result["pubsub"]["items"]["node"] = (
278 namespace if namespace else payload.namespace
279 )
280 result["pubsub"]["items"].append(item)
281 result.send()
283 async def __broadcast(
284 self,
285 data: ElementBase | None,
286 from_: JidStr,
287 to: OptJidStr = None,
288 items: EventItems | None = None,
289 **kwargs: object,
290 ) -> None:
291 from_ = JID(from_)
292 if from_ != self.xmpp.boundjid.bare and to is not None:
293 to = JID(to)
294 session = self.xmpp.get_session_from_jid(to)
295 if session is None:
296 return
297 await session.ready
299 item = EventItem()
300 if data:
301 item.set_payload(data.xml)
302 for k, v in kwargs.items():
303 item[k] = v
305 if items is None:
306 items = EventItems()
307 items.append(item)
308 assert data is not None
309 items["node"] = kwargs.get("node") or data.namespace
311 event = Event()
312 event.append(items)
314 msg = self.xmpp.Message()
315 msg.set_type("headline")
316 msg.set_from(from_)
317 msg.append(event)
319 if to is None:
320 with self.xmpp.store.session() as orm:
321 for u in orm.query(GatewayUser).all():
322 new_msg = copy(msg)
323 new_msg.set_to(u.jid.bare)
324 new_msg.send()
325 else:
326 msg.set_to(to)
327 msg.send()
329 async def broadcast_avatar(
330 self, from_: JidStr, to: JidStr, cached_avatar: CachedAvatar | None
331 ) -> None:
332 if cached_avatar is None:
333 await self.__broadcast(AvatarMetadata(), from_, to)
334 else:
335 pep_avatar = PepAvatar()
336 pep_avatar.set_avatar_from_cache(cached_avatar)
337 assert pep_avatar.metadata is not None
338 await self.__broadcast(
339 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"]
340 )
342 def broadcast_nick(
343 self,
344 user_jid: JID,
345 jid: JidStr,
346 nick: str | None = None,
347 ) -> None:
348 jid = JID(jid)
349 nickname = get_user_nick(nick)
350 log.debug("New nickname: %s", nickname)
351 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare))
353 def broadcast_space(self, session: AnySession, space: Space) -> None:
354 items = EventItems()
355 items["node"] = str(space.legacy_id)
356 for room in space.rooms:
357 item = EventItem()
358 item["id"] = room.jid
359 item.enable("conference")
360 if room.name:
361 item["conference"]["name"] = room.name
362 items.append(item)
363 session.create_task(
364 self.__broadcast(
365 None,
366 self.xmpp.boundjid.bare,
367 session.user_jid,
368 items=items,
369 )
370 )
372 def broadcast_space_metadata(
373 self, session: AnySession, space: Space, node: str
374 ) -> None:
375 msg = self.xmpp.make_message(
376 mtype="headline", mto=session.user_jid, mfrom=self.xmpp.boundjid.bare
377 )
378 msg["pubsub_event"]["configuration"]["node"] = node
380 form = msg["pubsub_event"]["configuration"]["form"]
381 form["type"] = "result"
383 form.add_field(
384 var="FORM_TYPE",
385 ftype="hidden",
386 value="http://jabber.org/protocol/pubsub#node_config",
387 )
388 form.add_field(var="pubsub#title", value=space.name)
390 # MUSTs
391 form.add_field(var="pubsub#type", value="urn:xmpp:spaces:0")
392 form.add_field(var="pubsub#notify_retract", value="true")
393 form.add_field(var="pubsub#persist_items", value="true")
394 form.add_field(var="pubsub#purge_offline", value="false")
395 # SHOULDs
396 form.add_field(var="pubsub#notify_sub", value="true")
397 form.add_field(var="pubsub#notify_config", value="true")
398 form.add_field(var="pubsub#notify_delete", value="true")
399 form.add_field(var="pubsub#publish_model", value="publishers")
400 # MUST (private spaces)
401 form.add_field(var="pubsub#access_model", value="authorize")
403 msg.send()
406def get_user_nick(nick: str | None = None) -> UserNick:
407 user_nick = UserNick()
408 if nick is not None:
409 user_nick["nick"] = nick
410 return user_nick
413log = logging.getLogger(__name__)
414register_plugin(PubSubComponent)