Coverage for slidge/core/pubsub.py: 83%
207 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import logging
2from copy import copy
3from pathlib import Path
4from typing import TYPE_CHECKING, Optional, Union
6from slixmpp import (
7 JID,
8 CoroutineCallback,
9 Iq,
10 Presence,
11 StanzaPath,
12 register_stanza_plugin,
13)
14from slixmpp.exceptions import IqError, IqTimeout, XMPPError
15from slixmpp.plugins.base import BasePlugin, register_plugin
16from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item
17from slixmpp.plugins.xep_0084 import Data as AvatarData
18from slixmpp.plugins.xep_0084 import MetaData as AvatarMetadata
19from slixmpp.plugins.xep_0172 import UserNick
20from slixmpp.plugins.xep_0292.stanza import VCard4
21from slixmpp.types import JidStr, OptJidStr
23from ..db.avatar import CachedAvatar, avatar_cache
24from ..db.models import GatewayUser
25from ..util.lock import NamedLockMixin
27if TYPE_CHECKING:
28 from slidge.core.gateway import BaseGateway
30 from ..contact.contact import LegacyContact
32VCARD4_NAMESPACE = "urn:xmpp:vcard4"
35class PepAvatar:
36 def __init__(self) -> None:
37 self.metadata: Optional[AvatarMetadata] = None
38 self.id: Optional[str] = None
39 self._avatar_data_path: Optional[Path] = None
41 @property
42 def data(self) -> Optional[AvatarData]:
43 if self._avatar_data_path is None:
44 return None
45 data = AvatarData()
46 data.set_value(self._avatar_data_path.read_bytes())
47 return data
49 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None:
50 metadata = AvatarMetadata()
51 self.id = cached_avatar.hash
52 metadata.add_info(
53 id=cached_avatar.hash,
54 itype="image/png",
55 ibytes=cached_avatar.path.stat().st_size,
56 height=str(cached_avatar.height),
57 width=str(cached_avatar.width),
58 )
59 self.metadata = metadata
60 self._avatar_data_path = cached_avatar.path
63class PubSubComponent(NamedLockMixin, BasePlugin):
64 xmpp: "BaseGateway"
66 name = "pubsub"
67 description = "Pubsub component"
68 dependencies = {
69 "xep_0030",
70 "xep_0060",
71 "xep_0115",
72 "xep_0163",
73 }
74 default_config = {"component_name": None}
75 component_name: str
77 def __init__(self, *a, **kw) -> None:
78 super(PubSubComponent, self).__init__(*a, **kw)
79 register_stanza_plugin(EventItem, UserNick)
81 def plugin_init(self) -> None:
82 self.xmpp.register_handler(
83 CoroutineCallback(
84 "pubsub_get_avatar_data",
85 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"),
86 self._get_avatar_data, # type:ignore
87 )
88 )
89 self.xmpp.register_handler(
90 CoroutineCallback(
91 "pubsub_get_avatar_metadata",
92 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"),
93 self._get_avatar_metadata, # type:ignore
94 )
95 )
96 self.xmpp.register_handler(
97 CoroutineCallback(
98 "pubsub_get_vcard",
99 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"),
100 self._get_vcard, # type:ignore
101 )
102 )
104 disco = self.xmpp.plugin["xep_0030"]
105 disco.add_identity("pubsub", "pep", self.component_name)
106 disco.add_identity("account", "registered", self.component_name)
107 disco.add_feature("http://jabber.org/protocol/pubsub#event")
108 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items")
109 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items")
111 async def __get_features(self, presence: Presence) -> list[str]:
112 from_ = presence.get_from()
113 ver_string = presence["caps"]["ver"]
114 if ver_string:
115 info = await self.xmpp.plugin["xep_0115"].get_caps(from_)
116 else:
117 info = None
118 if info is None:
119 async with self.lock(from_):
120 try:
121 iq = await self.xmpp.plugin["xep_0030"].get_info(from_)
122 except (IqError, IqTimeout):
123 log.debug("Could get disco#info of %s, ignoring", from_)
124 return []
125 info = iq["disco_info"]
126 return info["features"]
128 async def on_presence_available(
129 self, p: Presence, contact: Optional["LegacyContact"]
130 ) -> None:
131 if p.get_plugin("muc_join", check=True) is not None:
132 log.debug("Ignoring MUC presence here")
133 return
135 to = p.get_to()
136 if to != self.xmpp.boundjid.bare:
137 # we don't want to push anything for contacts that are not in the user's roster
138 if contact is None or not contact.is_friend:
139 return
141 from_ = p.get_from()
142 features = await self.__get_features(p)
144 if AvatarMetadata.namespace + "+notify" in features:
145 try:
146 pep_avatar = await self._get_authorized_avatar(p, contact)
147 except XMPPError:
148 pass
149 else:
150 if pep_avatar.metadata is not None:
151 await self.__broadcast(
152 data=pep_avatar.metadata,
153 from_=p.get_to().bare,
154 to=from_,
155 id=pep_avatar.metadata["info"]["id"],
156 )
157 if UserNick.namespace + "+notify" in features:
158 try:
159 pep_nick = await self._get_authorized_nick(p, contact)
160 except XMPPError:
161 pass
162 else:
163 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_)
165 if contact is not None and VCARD4_NAMESPACE + "+notify" in features:
166 await self.broadcast_vcard_event(
167 p.get_to(), from_, await contact.get_vcard()
168 )
170 async def broadcast_vcard_event(
171 self, from_: JID, to: JID, vcard: VCard4 | None
172 ) -> None:
173 item = Item()
174 item.namespace = VCARD4_NAMESPACE
175 item["id"] = "current"
176 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to)
177 # The vcard content should NOT be in this event according to the spec:
178 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
179 # but movim expects it to be here, and I guess it does not hurt
181 log.debug("Broadcast vcard4 event: %s", vcard)
182 await self.__broadcast(
183 data=vcard,
184 from_=JID(from_).bare,
185 to=to,
186 id="current",
187 node=VCARD4_NAMESPACE,
188 )
190 async def __get_contact(self, stanza: Union[Iq, Presence]):
191 session = self.xmpp.get_session_from_stanza(stanza)
192 return await session.contacts.by_jid(stanza.get_to())
194 async def _get_authorized_avatar(
195 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None
196 ) -> PepAvatar:
197 if stanza.get_to() == self.xmpp.boundjid.bare:
198 item = PepAvatar()
199 if self.xmpp.avatar is not None:
200 item.set_avatar_from_cache(self.xmpp.avatar)
201 return item
203 if contact is None:
204 contact = await self.__get_contact(stanza)
206 item = PepAvatar()
207 if contact.stored.avatar is not None:
208 stored = avatar_cache.get(contact.stored.avatar)
209 assert stored is not None
210 item.set_avatar_from_cache(stored)
211 return item
213 async def _get_authorized_nick(
214 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None
215 ) -> UserNick:
216 if stanza.get_to() == self.xmpp.boundjid.bare:
217 return get_user_nick(self.xmpp.COMPONENT_NAME)
219 if contact is None:
220 contact = await self.__get_contact(stanza)
222 if contact.name is not None:
223 return get_user_nick(contact.name)
224 else:
225 return UserNick()
227 def __reply_with(
228 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None
229 ) -> None:
230 requested_items = iq["pubsub"]["items"]
232 if len(requested_items) == 0:
233 self._reply_with_payload(iq, content, item_id)
234 else:
235 for item in requested_items:
236 if item["id"] == item_id:
237 self._reply_with_payload(iq, content, item_id)
238 return
239 else:
240 raise XMPPError("item-not-found")
242 async def _get_avatar_data(self, iq: Iq) -> None:
243 pep_avatar = await self._get_authorized_avatar(iq)
244 self.__reply_with(iq, pep_avatar.data, pep_avatar.id)
246 async def _get_avatar_metadata(self, iq: Iq) -> None:
247 pep_avatar = await self._get_authorized_avatar(iq)
248 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id)
250 async def _get_vcard(self, iq: Iq):
251 # this is not the proper way that clients should retrieve VCards, but
252 # gajim does it this way.
253 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
254 session = self.xmpp.get_session_from_stanza(iq)
255 contact = await session.contacts.by_jid(iq.get_to())
256 vcard = await contact.get_vcard()
257 if vcard is None:
258 raise XMPPError("item-not-found")
259 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE)
261 @staticmethod
262 def _reply_with_payload(
263 iq: Iq,
264 payload: Optional[Union[AvatarMetadata, AvatarData, VCard4]],
265 id_: Optional[str],
266 namespace: Optional[str] = None,
267 ) -> None:
268 result = iq.reply()
269 item = Item()
270 if payload:
271 item.set_payload(payload.xml)
272 item["id"] = id_
273 result["pubsub"]["items"]["node"] = (
274 namespace if namespace else payload.namespace
275 )
276 result["pubsub"]["items"].append(item)
277 result.send()
279 async def __broadcast(
280 self, data, from_: JidStr, to: OptJidStr = None, **kwargs
281 ) -> None:
282 from_ = JID(from_)
283 if from_ != self.xmpp.boundjid.bare and to is not None:
284 to = JID(to)
285 session = self.xmpp.get_session_from_jid(to)
286 if session is None:
287 return
288 await session.ready
290 item = EventItem()
291 if data:
292 item.set_payload(data.xml)
293 for k, v in kwargs.items():
294 item[k] = v
296 items = EventItems()
297 items.append(item)
298 items["node"] = kwargs.get("node") or data.namespace
300 event = Event()
301 event.append(items)
303 msg = self.xmpp.Message()
304 msg.set_type("headline")
305 msg.set_from(from_)
306 msg.append(event)
308 if to is None:
309 with self.xmpp.store.session() as orm:
310 for u in orm.query(GatewayUser).all():
311 new_msg = copy(msg)
312 new_msg.set_to(u.jid.bare)
313 new_msg.send()
314 else:
315 msg.set_to(to)
316 msg.send()
318 async def broadcast_avatar(
319 self, from_: JidStr, to: JidStr, cached_avatar: Optional[CachedAvatar]
320 ) -> None:
321 if cached_avatar is None:
322 await self.__broadcast(AvatarMetadata(), from_, to)
323 else:
324 pep_avatar = PepAvatar()
325 pep_avatar.set_avatar_from_cache(cached_avatar)
326 assert pep_avatar.metadata is not None
327 await self.__broadcast(
328 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"]
329 )
331 def broadcast_nick(
332 self,
333 user_jid: JID,
334 jid: JidStr,
335 nick: Optional[str] = None,
336 ) -> None:
337 jid = JID(jid)
338 nickname = get_user_nick(nick)
339 log.debug("New nickname: %s", nickname)
340 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare))
343def get_user_nick(nick: Optional[str] = None) -> UserNick:
344 user_nick = UserNick()
345 if nick is not None:
346 user_nick["nick"] = nick
347 return user_nick
350log = logging.getLogger(__name__)
351register_plugin(PubSubComponent)