Coverage for slidge / group / bookmarks.py: 91%
170 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import abc
2import logging
3from collections.abc import Iterator
4from typing import Any, Generic, Literal, overload
6from slixmpp import JID
7from slixmpp.exceptions import XMPPError
8from sqlalchemy.orm import Session as OrmSession
10from ..db.models import Contact, Room, Space
11from ..util import SubclassableOnce
12from ..util.jid_escaping import ESCAPE_TABLE, unescape_node
13from ..util.lock import NamedLockMixin
14from ..util.types import (
15 AnyContact,
16 AnyMUC,
17 AnySession,
18 AnySpaceMetadata,
19 LegacyGroupIdType,
20 LegacyMUCType,
21 LegacyUserIdType,
22 SpaceMetadata,
23)
26class LegacyBookmarks(
27 Generic[LegacyGroupIdType, LegacyMUCType, LegacyUserIdType],
28 NamedLockMixin,
29 SubclassableOnce,
30):
31 """
32 This is instantiated once per :class:`~slidge.BaseSession`
33 """
35 _muc_cls: type[LegacyMUCType]
37 def __init__(self, session: AnySession) -> None:
38 self.session = session
39 self.xmpp = session.xmpp
40 self.user_jid = session.user_jid
42 self._user_nick: str = self.session.user_jid.node
44 super().__init__()
45 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks")
46 self.ready = self.session.xmpp.loop.create_future()
47 if not self.xmpp.GROUPS:
48 self.ready.set_result(True)
50 @property
51 def user_nick(self) -> str:
52 return self._user_nick
54 @user_nick.setter
55 def user_nick(self, nick: str) -> None:
56 self._user_nick = nick
58 def orm(
59 self,
60 **kwargs: Any, # noqa:ANN401
61 ) -> OrmSession:
62 return self.session.xmpp.store.session(**kwargs)
64 def from_store(self, stored: Room) -> LegacyMUCType:
65 return self._muc_cls(self.session, stored)
67 def __iter__(self) -> Iterator[LegacyMUCType]:
68 with self.xmpp.store.session() as orm:
69 for stored in orm.query(Room).filter_by(user=self.session.user).all():
70 if stored.updated:
71 yield self.from_store(stored)
73 def __repr__(self) -> str:
74 return f"<Bookmarks of {self.user_jid}>"
76 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType) -> str:
77 return await self.legacy_id_to_jid_username(legacy_id)
79 async def jid_local_part_to_legacy_id(self, local_part: str) -> LegacyGroupIdType:
80 return await self.jid_username_to_legacy_id(local_part)
82 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType) -> str:
83 """
84 The default implementation calls ``str()`` on the legacy_id and
85 escape characters according to :xep:`0106`.
87 You can override this class and implement a more subtle logic to raise
88 an :class:`~slixmpp.exceptions.XMPPError` early
90 :param legacy_id:
91 :return:
92 """
93 return str(legacy_id).translate(ESCAPE_TABLE)
95 async def jid_username_to_legacy_id(self, username: str) -> LegacyGroupIdType:
96 """
98 :param username:
99 :return:
100 """
101 return unescape_node(username) # type:ignore[no-any-return]
103 async def by_jid(self, jid: JID) -> LegacyMUCType:
104 if jid.resource:
105 jid = JID(jid.bare)
106 async with self.lock(("bare", jid.bare)):
107 legacy_id = await self.jid_local_part_to_legacy_id(jid.node)
108 if self.get_lock(("legacy_id", legacy_id)):
109 self.session.log.debug("Already updating %s via by_legacy_id()", jid)
110 return await self.by_legacy_id(legacy_id)
112 with self.session.xmpp.store.session() as orm:
113 stored = (
114 orm.query(Room)
115 .filter_by(user_account_id=self.session.user_pk, jid=jid)
116 .one_or_none()
117 )
118 if stored is None:
119 stored = Room(
120 user_account_id=self.session.user_pk,
121 jid=jid,
122 legacy_id=legacy_id,
123 )
124 return await self.__update_if_needed(stored)
126 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None:
127 with self.xmpp.store.session(expire_on_commit=False) as orm:
128 stored = (
129 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none()
130 )
131 if stored is not None and stored.updated:
132 return self.from_store(stored)
133 return None
135 @overload
136 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> "LegacyMUCType": ...
138 @overload
139 async def by_legacy_id(
140 self, legacy_id: LegacyGroupIdType, create: Literal[False]
141 ) -> "LegacyMUCType | None": ...
143 @overload
144 async def by_legacy_id(
145 self, legacy_id: LegacyGroupIdType, create: Literal[True]
146 ) -> "LegacyMUCType": ...
148 async def by_legacy_id(
149 self, legacy_id: LegacyGroupIdType, create: bool = False
150 ) -> LegacyMUCType | None:
151 async with self.lock(("legacy_id", legacy_id)):
152 local = await self.legacy_id_to_jid_local_part(legacy_id)
153 jid = JID(f"{local}@{self.xmpp.boundjid}")
154 if self.get_lock(("bare", jid.bare)):
155 self.session.log.debug("Already updating %s via by_jid()", jid)
156 if create:
157 return await self.by_jid(jid)
158 else:
159 return self.by_jid_only_if_exists(jid)
161 with self.xmpp.store.session() as orm:
162 stored = (
163 orm.query(Room)
164 .filter_by(
165 user_account_id=self.session.user_pk,
166 legacy_id=str(legacy_id),
167 )
168 .one_or_none()
169 )
170 if stored is None:
171 stored = Room(
172 user_account_id=self.session.user_pk,
173 jid=jid,
174 legacy_id=str(legacy_id),
175 )
176 return await self.__update_if_needed(stored)
178 async def __update_if_needed(self, stored: Room) -> LegacyMUCType:
179 muc = self.from_store(stored)
180 if muc.stored.updated:
181 return muc
183 with muc.updating_info():
184 try:
185 await muc.update_info()
186 except NotImplementedError:
187 pass
188 except XMPPError:
189 raise
190 except Exception as e:
191 raise XMPPError("internal-server-error", str(e))
192 muc.archive.room = muc.stored
193 if self.ready.done() and muc.stored.space_id:
194 with self.orm() as orm:
195 orm.add(muc.stored)
196 self.xmpp.pubsub.broadcast_space(self.session, muc.stored.space)
197 return muc
199 async def fill(self) -> None:
200 """
201 Establish a user's known groups.
203 This has to be overridden in plugins with group support and at the
204 minimum, this should ``await self.by_legacy_id(group_id)`` for all
205 the groups a user is part of.
207 Slidge internals will call this on successful :meth:`BaseSession.login`
209 """
210 if self.xmpp.GROUPS:
211 raise NotImplementedError(
212 "The plugin advertised support for groups but"
213 " LegacyBookmarks.fill() was not overridden."
214 )
216 async def remove(
217 self,
218 muc: AnyMUC,
219 reason: str = "You left this group from the official client.",
220 kick: bool = True,
221 ) -> None:
222 """
223 Delete everything about a specific group.
225 This should be called when the user leaves the group from the official
226 app.
228 :param muc: The MUC to remove.
229 :param reason: Optionally, a reason why this group was removed.
230 :param kick: Whether the user should be kicked from this group. Set this
231 to False in case you do this somewhere else in your code, eg, on
232 receiving the confirmation that the group was deleted.
233 """
234 if kick:
235 user_participant = await muc.get_user_participant()
236 user_participant.kick(reason)
237 with self.xmpp.store.session() as orm:
238 orm.add(muc.stored)
239 orm.refresh(muc.stored)
240 orm.delete(muc.stored)
241 orm.commit()
243 async def update_space_if_needed(self, space: Space) -> Space:
244 if space.updated:
245 return space
246 meta: AnySpaceMetadata = await self.fetch_space_metadata(
247 self.xmpp.LEGACY_ROOM_ID_TYPE(space.legacy_id)
248 )
249 return await self.__update_space_metadata(space, meta)
251 async def __update_space_metadata(
252 self, space: Space, meta: SpaceMetadata[LegacyGroupIdType]
253 ) -> Space:
254 creator = (
255 await self.__get_stored_contact(str(meta.creator_legacy_id))
256 if meta.creator_legacy_id
257 else None
258 )
259 owners: list[Contact] = []
260 for legacy_id in set(meta.owner_legacy_ids):
261 if legacy_id == meta.creator_legacy_id:
262 # We don't want to fetch any contact twice here to avoid:
263 # Can't attach instance <Contact at xxx>; another instance [...] is already present in this session.
264 continue
265 owner = await self.__get_stored_contact(str(legacy_id))
266 if owner is not None:
267 owners.append(owner)
268 if (
269 creator is not None
270 and meta.creator_legacy_id
271 and meta.creator_legacy_id in meta.owner_legacy_ids
272 ):
273 owners.append(creator)
274 with self.orm(expire_on_commit=False) as orm:
275 if creator is not None:
276 creator = orm.merge(creator)
277 owners = [orm.merge(owner) for owner in owners]
278 space = orm.merge(space)
279 space.name = meta.name or space.legacy_id
280 space.creator = creator
281 space.owners = owners
282 space.description = meta.description
283 space.member_count = meta.member_count
284 space.updated = True
285 orm.commit()
286 return space
288 async def __get_stored_contact(self, legacy_id_str: str) -> Contact | None:
289 try:
290 contact: AnyContact = await self.session.contacts.by_legacy_id(
291 self.xmpp.LEGACY_CONTACT_ID_TYPE(legacy_id_str)
292 )
293 except Exception as e:
294 self.log.warning("Could not get contact: %r", e)
295 return None
296 return contact.stored
298 @abc.abstractmethod
299 async def fetch_space_metadata(
300 self, legacy_id: LegacyGroupIdType
301 ) -> SpaceMetadata[LegacyUserIdType]:
302 """
303 Fetch metadata associated to a space.
305 This is called once per slidge runtime. It should return metadata
306 associated to the space identified by its ``legacy_id``.
307 If there are updates to this metata, they should be communicated to
308 slidge by calling :ref:`LegacyBookmarks.update_space_metadata`.
310 :param legacy_id: Identifier of the space.
312 :return: Metadata associated to the space.
313 """
314 raise NotImplementedError
316 async def update_spaces_info(self) -> None:
317 with self.orm() as orm:
318 spaces = self.session.xmpp.store.spaces.get_unupdated(
319 orm, self.session.user_pk
320 )
321 for space in spaces:
322 await self.update_space_if_needed(space)
324 async def space_legacy_id_to_node(self, legacy_id: LegacyGroupIdType) -> str:
325 return str(legacy_id)
327 async def space_node_to_legacy_id(self, node: str) -> LegacyGroupIdType:
328 return self.xmpp.LEGACY_ROOM_ID_TYPE(node) # type:ignore[no-any-return]
330 async def update_space_metadata(
331 self,
332 legacy_id: LegacyGroupIdType,
333 metadata: SpaceMetadata[LegacyGroupIdType],
334 ) -> None:
335 """
336 Updates metadata associated to a space.
338 :param legacy_id: Identifier of the space.
339 :param name: Metadata associated to this space.
340 """
341 with self.orm(expire_on_commit=False) as orm:
342 space = self.session.xmpp.store.spaces.add_or_get(
343 orm,
344 self.session.user_pk,
345 str(legacy_id),
346 )
347 space = await self.__update_space_metadata(space, metadata)
348 self.xmpp.pubsub.broadcast_space_metadata(
349 self.session, space, await self.space_legacy_id_to_node(legacy_id)
350 )