Coverage for slidge/core/pubsub.py: 85%
207 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import logging
2from copy import copy
3from pathlib import Path
4from typing import TYPE_CHECKING, Optional, Union
6from slixmpp import (
7 JID,
8 CoroutineCallback,
9 Iq,
10 Presence,
11 StanzaPath,
12 register_stanza_plugin,
13)
14from slixmpp.exceptions import XMPPError
15from slixmpp.plugins.base import BasePlugin, register_plugin
16from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item
17from slixmpp.plugins.xep_0084 import Data as AvatarData
18from slixmpp.plugins.xep_0084 import MetaData as AvatarMetadata
19from slixmpp.plugins.xep_0172 import UserNick
20from slixmpp.plugins.xep_0292.stanza import VCard4
21from slixmpp.types import JidStr, OptJidStr
23from ..db.avatar import CachedAvatar, avatar_cache
24from ..db.store import ContactStore, SlidgeStore
25from .mixins.lock import NamedLockMixin
27if TYPE_CHECKING:
28 from slidge.core.gateway import BaseGateway
30 from ..contact.contact import LegacyContact
32VCARD4_NAMESPACE = "urn:xmpp:vcard4"
35class PepItem:
36 pass
39class PepAvatar(PepItem):
40 store: SlidgeStore
42 def __init__(self):
43 self.metadata: Optional[AvatarMetadata] = None
44 self.id: Optional[str] = None
45 self._avatar_data_path: Optional[Path] = None
47 @property
48 def data(self) -> Optional[AvatarData]:
49 if self._avatar_data_path is None:
50 return None
51 data = AvatarData()
52 data.set_value(self._avatar_data_path.read_bytes())
53 return data
55 def set_avatar_from_cache(self, cached_avatar: CachedAvatar):
56 metadata = AvatarMetadata()
57 self.id = cached_avatar.hash
58 metadata.add_info(
59 id=cached_avatar.hash,
60 itype="image/png",
61 ibytes=cached_avatar.path.stat().st_size,
62 height=str(cached_avatar.height),
63 width=str(cached_avatar.width),
64 )
65 self.metadata = metadata
66 self._avatar_data_path = cached_avatar.path
69class PepNick(PepItem):
70 contact_store: ContactStore
72 def __init__(self, nick: Optional[str] = None):
73 nickname = UserNick()
74 if nick is not None:
75 nickname["nick"] = nick
76 self.nick = nickname
77 self.__nick_str = nick
80class PubSubComponent(NamedLockMixin, BasePlugin):
81 xmpp: "BaseGateway"
83 name = "pubsub"
84 description = "Pubsub component"
85 dependencies = {
86 "xep_0030",
87 "xep_0060",
88 "xep_0115",
89 "xep_0163",
90 }
91 default_config = {"component_name": None}
92 component_name: str
94 def __init__(self, *a, **kw):
95 super(PubSubComponent, self).__init__(*a, **kw)
96 register_stanza_plugin(EventItem, UserNick)
98 def plugin_init(self):
99 self.xmpp.register_handler(
100 CoroutineCallback(
101 "pubsub_get_avatar_data",
102 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"),
103 self._get_avatar_data, # type:ignore
104 )
105 )
106 self.xmpp.register_handler(
107 CoroutineCallback(
108 "pubsub_get_avatar_metadata",
109 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"),
110 self._get_avatar_metadata, # type:ignore
111 )
112 )
113 self.xmpp.register_handler(
114 CoroutineCallback(
115 "pubsub_get_vcard",
116 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"),
117 self._get_vcard, # type:ignore
118 )
119 )
121 disco = self.xmpp.plugin["xep_0030"]
122 disco.add_identity("pubsub", "pep", self.component_name)
123 disco.add_identity("account", "registered", self.component_name)
124 disco.add_feature("http://jabber.org/protocol/pubsub#event")
125 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items")
126 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items")
128 async def __get_features(self, presence: Presence) -> list[str]:
129 from_ = presence.get_from()
130 ver_string = presence["caps"]["ver"]
131 if ver_string:
132 info = await self.xmpp.plugin["xep_0115"].get_caps(from_)
133 else:
134 info = None
135 if info is None:
136 async with self.lock(from_):
137 iq = await self.xmpp.plugin["xep_0030"].get_info(from_)
138 info = iq["disco_info"]
139 return info["features"]
141 async def on_presence_available(
142 self, p: Presence, contact: Optional["LegacyContact"]
143 ):
144 if p.get_plugin("muc_join", check=True) is not None:
145 log.debug("Ignoring MUC presence here")
146 return
148 to = p.get_to()
149 if to != self.xmpp.boundjid.bare:
150 # we don't want to push anything for contacts that are not in the user's roster
151 if contact is None or not contact.is_friend:
152 return
154 from_ = p.get_from()
155 features = await self.__get_features(p)
157 if AvatarMetadata.namespace + "+notify" in features:
158 try:
159 pep_avatar = await self._get_authorized_avatar(p, contact)
160 except XMPPError:
161 pass
162 else:
163 if pep_avatar.metadata is not None:
164 await self.__broadcast(
165 data=pep_avatar.metadata,
166 from_=p.get_to().bare,
167 to=from_,
168 id=pep_avatar.metadata["info"]["id"],
169 )
170 if UserNick.namespace + "+notify" in features:
171 try:
172 pep_nick = await self._get_authorized_nick(p, contact)
173 except XMPPError:
174 pass
175 else:
176 await self.__broadcast(data=pep_nick.nick, from_=p.get_to(), to=from_)
178 if contact is not None and VCARD4_NAMESPACE + "+notify" in features:
179 await self.broadcast_vcard_event(
180 p.get_to(), from_, await contact.get_vcard()
181 )
183 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4 | None):
184 item = Item()
185 item.namespace = VCARD4_NAMESPACE
186 item["id"] = "current"
187 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to)
188 # The vcard content should NOT be in this event according to the spec:
189 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
190 # but movim expects it to be here, and I guess it does not hurt
192 log.debug("Broadcast vcard4 event: %s", vcard)
193 await self.__broadcast(
194 data=vcard,
195 from_=JID(from_).bare,
196 to=to,
197 id="current",
198 node=VCARD4_NAMESPACE,
199 )
201 async def __get_contact(self, stanza: Union[Iq, Presence]):
202 session = self.xmpp.get_session_from_stanza(stanza)
203 return await session.contacts.by_jid(stanza.get_to())
205 async def _get_authorized_avatar(
206 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None
207 ) -> PepAvatar:
208 if stanza.get_to() == self.xmpp.boundjid.bare:
209 item = PepAvatar()
210 item.set_avatar_from_cache(avatar_cache.get_by_pk(self.xmpp.avatar_pk))
211 return item
213 if contact is None:
214 contact = await self.__get_contact(stanza)
216 item = PepAvatar()
217 if contact.avatar_pk is not None:
218 stored = avatar_cache.get_by_pk(contact.avatar_pk)
219 assert stored is not None
220 item.set_avatar_from_cache(stored)
221 return item
223 async def _get_authorized_nick(
224 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None
225 ) -> PepNick:
226 if stanza.get_to() == self.xmpp.boundjid.bare:
227 return PepNick(self.xmpp.COMPONENT_NAME)
229 if contact is None:
230 contact = await self.__get_contact(stanza)
232 if contact.name is not None:
233 return PepNick(contact.name)
234 else:
235 return PepNick()
237 def __reply_with(
238 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None
239 ) -> None:
240 requested_items = iq["pubsub"]["items"]
242 if len(requested_items) == 0:
243 self._reply_with_payload(iq, content, item_id)
244 else:
245 for item in requested_items:
246 if item["id"] == item_id:
247 self._reply_with_payload(iq, content, item_id)
248 return
249 else:
250 raise XMPPError("item-not-found")
252 async def _get_avatar_data(self, iq: Iq):
253 pep_avatar = await self._get_authorized_avatar(iq)
254 self.__reply_with(iq, pep_avatar.data, pep_avatar.id)
256 async def _get_avatar_metadata(self, iq: Iq):
257 pep_avatar = await self._get_authorized_avatar(iq)
258 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id)
260 async def _get_vcard(self, iq: Iq):
261 # this is not the proper way that clients should retrieve VCards, but
262 # gajim does it this way.
263 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224
264 session = self.xmpp.get_session_from_stanza(iq)
265 contact = await session.contacts.by_jid(iq.get_to())
266 vcard = await contact.get_vcard()
267 if vcard is None:
268 raise XMPPError("item-not-found")
269 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE)
271 @staticmethod
272 def _reply_with_payload(
273 iq: Iq,
274 payload: Optional[Union[AvatarMetadata, AvatarData, VCard4]],
275 id_: Optional[str],
276 namespace: Optional[str] = None,
277 ):
278 result = iq.reply()
279 item = Item()
280 if payload:
281 item.set_payload(payload.xml)
282 item["id"] = id_
283 result["pubsub"]["items"]["node"] = (
284 namespace if namespace else payload.namespace
285 )
286 result["pubsub"]["items"].append(item)
287 result.send()
289 async def __broadcast(self, data, from_: JidStr, to: OptJidStr = None, **kwargs):
290 from_ = JID(from_)
291 if from_ != self.xmpp.boundjid.bare and to is not None:
292 to = JID(to)
293 session = self.xmpp.get_session_from_jid(to)
294 if session is None:
295 return
296 await session.ready
298 item = EventItem()
299 if data:
300 item.set_payload(data.xml)
301 for k, v in kwargs.items():
302 item[k] = v
304 items = EventItems()
305 items.append(item)
306 items["node"] = kwargs.get("node") or data.namespace
308 event = Event()
309 event.append(items)
311 msg = self.xmpp.Message()
312 msg.set_type("headline")
313 msg.set_from(from_)
314 msg.append(event)
316 if to is None:
317 for u in self.xmpp.store.users.get_all():
318 new_msg = copy(msg)
319 new_msg.set_to(u.jid.bare)
320 new_msg.send()
321 else:
322 msg.set_to(to)
323 msg.send()
325 async def broadcast_avatar(
326 self, from_: JidStr, to: JidStr, cached_avatar: Optional[CachedAvatar]
327 ) -> None:
328 if cached_avatar is None:
329 await self.__broadcast(AvatarMetadata(), from_, to)
330 else:
331 pep_avatar = PepAvatar()
332 pep_avatar.set_avatar_from_cache(cached_avatar)
333 assert pep_avatar.metadata is not None
334 await self.__broadcast(
335 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"]
336 )
338 def broadcast_nick(
339 self,
340 user_jid: JID,
341 jid: JidStr,
342 nick: Optional[str] = None,
343 ):
344 jid = JID(jid)
345 nickname = PepNick(nick)
346 log.debug("New nickname: %s", nickname.nick)
347 self.xmpp.loop.create_task(self.__broadcast(nickname.nick, jid, user_jid.bare))
350log = logging.getLogger(__name__)
351register_plugin(PubSubComponent)