Coverage for slidge / group / bookmarks.py: 91%
171 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import abc
2import logging
3from collections.abc import Iterator
4from typing import Any, Generic, Literal, overload
6from slixmpp import JID
7from slixmpp.exceptions import XMPPError
8from sqlalchemy.orm import Session as OrmSession
10from slidge.contact import LegacyContact
12from ..db.models import Contact, Room, Space
13from ..util import SubclassableOnce
14from ..util.jid_escaping import EscapeMixin
15from ..util.lock import NamedLockMixin
16from ..util.types import (
17 AnyMUC,
18 AnySession,
19 LegacyMUCType,
20 SpaceMetadata,
21)
24class LegacyBookmarks(
25 Generic[LegacyMUCType],
26 EscapeMixin,
27 NamedLockMixin,
28 SubclassableOnce,
29):
30 """
31 This is instantiated once per :class:`~slidge.BaseSession`
32 """
34 _muc_cls: type[LegacyMUCType]
36 def __init__(self, session: AnySession) -> None:
37 self.session = session
38 self.xmpp = session.xmpp
39 self.user_jid = session.user_jid
41 self._user_nick: str = self.session.user_jid.node
43 super().__init__()
44 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks")
45 self.ready = self.session.xmpp.loop.create_future()
46 if not self.xmpp.GROUPS:
47 self.ready.set_result(True)
49 @property
50 def user_nick(self) -> str:
51 return self._user_nick
53 @user_nick.setter
54 def user_nick(self, nick: str) -> None:
55 self._user_nick = nick
57 def orm(
58 self,
59 **kwargs: Any, # noqa:ANN401
60 ) -> OrmSession:
61 return self.session.xmpp.store.session(**kwargs)
63 def from_store(self, stored: Room) -> LegacyMUCType:
64 return self._muc_cls(self.session, stored)
66 def __iter__(self) -> Iterator[LegacyMUCType]:
67 with self.xmpp.store.session() as orm:
68 for stored in orm.query(Room).filter_by(user=self.session.user).all():
69 if stored.updated:
70 yield self.from_store(stored)
72 def __repr__(self) -> str:
73 return f"<Bookmarks of {self.user_jid}>"
75 async def legacy_id_to_jid_local_part(self, legacy_id: str) -> str:
76 return await self.legacy_id_to_jid_username(legacy_id)
78 async def jid_local_part_to_legacy_id(self, local_part: str) -> str:
79 return await self.jid_username_to_legacy_id(local_part)
81 async def by_jid(self, jid: JID) -> LegacyMUCType:
82 if jid.resource:
83 jid = JID(jid.bare)
84 async with self.lock(("bare", jid.bare)):
85 legacy_id = await self.jid_local_part_to_legacy_id(jid.node)
86 if self.get_lock(("legacy_id", legacy_id)):
87 self.session.log.debug("Already updating %s via by_legacy_id()", jid)
88 return await self.by_legacy_id(legacy_id)
90 with self.session.xmpp.store.session() as orm:
91 stored = (
92 orm.query(Room)
93 .filter_by(user_account_id=self.session.user_pk, jid=jid)
94 .one_or_none()
95 )
96 if stored is None:
97 stored = Room(
98 user_account_id=self.session.user_pk,
99 jid=jid,
100 legacy_id=legacy_id,
101 )
102 return await self.__update_if_needed(stored)
104 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None:
105 with self.xmpp.store.session(expire_on_commit=False) as orm:
106 stored = (
107 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none()
108 )
109 if stored is not None and stored.updated:
110 return self.from_store(stored)
111 return None
113 @overload
114 async def by_legacy_id(self, legacy_id: str) -> "LegacyMUCType": ...
116 @overload
117 async def by_legacy_id(
118 self, legacy_id: str, create: Literal[False]
119 ) -> "LegacyMUCType | None": ...
121 @overload
122 async def by_legacy_id(
123 self, legacy_id: str, create: Literal[True]
124 ) -> "LegacyMUCType": ...
126 async def by_legacy_id(
127 self, legacy_id: str, create: bool = False
128 ) -> LegacyMUCType | None:
129 async with self.lock(("legacy_id", legacy_id)):
130 local = await self.legacy_id_to_jid_local_part(legacy_id)
131 jid = JID(f"{local}@{self.xmpp.boundjid}")
132 if self.get_lock(("bare", jid.bare)):
133 self.session.log.debug("Already updating %s via by_jid()", jid)
134 if create:
135 return await self.by_jid(jid)
136 else:
137 return self.by_jid_only_if_exists(jid)
139 with self.xmpp.store.session() as orm:
140 stored = (
141 orm.query(Room)
142 .filter_by(
143 user_account_id=self.session.user_pk,
144 legacy_id=str(legacy_id),
145 )
146 .one_or_none()
147 )
148 if stored is None:
149 stored = Room(
150 user_account_id=self.session.user_pk,
151 jid=jid,
152 legacy_id=str(legacy_id),
153 )
154 return await self.__update_if_needed(stored)
156 async def __update_if_needed(self, stored: Room) -> LegacyMUCType:
157 muc = self.from_store(stored)
158 if muc.stored.updated:
159 return muc
161 with muc.updating_info():
162 try:
163 await muc.update_info()
164 except NotImplementedError:
165 pass
166 except XMPPError:
167 raise
168 except Exception as e:
169 raise XMPPError("internal-server-error", str(e))
170 muc.archive.room = muc.stored
171 if self.ready.done() and muc.stored.space_id:
172 with self.orm() as orm:
173 orm.add(muc.stored)
174 self.xmpp.pubsub.broadcast_space(self.session, muc.stored.space)
175 return muc
177 async def fill(self) -> None:
178 """
179 Establish a user's known groups.
181 This has to be overridden in plugins with group support and at the
182 minimum, this should ``await self.by_legacy_id(group_id)`` for all
183 the groups a user is part of.
185 Slidge internals will call this on successful :meth:`BaseSession.login`
187 """
188 if self.xmpp.GROUPS:
189 raise NotImplementedError(
190 "The plugin advertised support for groups but"
191 " LegacyBookmarks.fill() was not overridden."
192 )
194 async def remove(
195 self,
196 muc: AnyMUC,
197 reason: str = "You left this group from the official client.",
198 kick: bool = True,
199 ) -> None:
200 """
201 Delete everything about a specific group.
203 This should be called when the user leaves the group from the official
204 app.
206 :param muc: The MUC to remove.
207 :param reason: Optionally, a reason why this group was removed.
208 :param kick: Whether the user should be kicked from this group. Set this
209 to False in case you do this somewhere else in your code, eg, on
210 receiving the confirmation that the group was deleted.
211 """
212 if kick:
213 user_participant = await muc.get_user_participant()
214 user_participant.kick(reason)
215 with self.xmpp.store.session() as orm:
216 orm.add(muc.stored)
217 orm.refresh(muc.stored)
218 orm.delete(muc.stored)
219 orm.commit()
221 async def update_space_if_needed(self, space: Space) -> Space:
222 async with self.lock(("space", space.legacy_id)):
223 with self.orm() as orm:
224 orm.add(space)
225 orm.refresh(space)
226 if space.updated:
227 return space
228 meta: SpaceMetadata = await self.fetch_space_metadata(space.legacy_id)
229 return await self.__update_space_metadata(space, meta)
231 async def __update_space_metadata(self, space: Space, meta: SpaceMetadata) -> Space:
232 creator = (
233 await self.__get_stored_contact(meta.creator_legacy_id)
234 if meta.creator_legacy_id
235 else None
236 )
237 owners: list[Contact] = []
238 for legacy_id in set(meta.owner_legacy_ids):
239 if legacy_id == meta.creator_legacy_id:
240 # We don't want to fetch any contact twice here to avoid:
241 # Can't attach instance <Contact at xxx>; another instance [...] is already present in this session.
242 continue
243 owner = await self.__get_stored_contact(legacy_id)
244 if owner is not None:
245 owners.append(owner)
246 if (
247 creator is not None
248 and meta.creator_legacy_id
249 and meta.creator_legacy_id in meta.owner_legacy_ids
250 ):
251 owners.append(creator)
252 with self.orm(expire_on_commit=False) as orm:
253 if creator is not None:
254 creator = orm.merge(creator)
255 owners = [orm.merge(owner) for owner in owners]
256 space = orm.merge(space)
257 space.name = meta.name or space.legacy_id
258 space.creator = creator
259 space.owners = owners
260 space.description = meta.description
261 space.member_count = meta.member_count
262 space.updated = True
263 orm.commit()
264 return space
266 async def __get_stored_contact(self, legacy_id_str: str) -> Contact | None:
267 try:
268 contact: LegacyContact = await self.session.contacts.by_legacy_id(
269 legacy_id_str
270 )
271 except Exception as e:
272 self.log.warning("Could not get contact: %r", e)
273 return None
274 return contact.stored
276 @abc.abstractmethod
277 async def fetch_space_metadata(self, legacy_id: str) -> SpaceMetadata:
278 """
279 Fetch metadata associated to a space.
281 This is called once per slidge runtime. It should return metadata
282 associated to the space identified by its ``legacy_id``.
283 If there are updates to this metata, they should be communicated to
284 slidge by calling :func:`LegacyBookmarks.update_space_metadata`.
286 :param legacy_id: Identifier of the space.
288 :return: Metadata associated to the space.
289 """
290 raise NotImplementedError
292 async def update_spaces_info(self) -> None:
293 with self.orm() as orm:
294 spaces = self.session.xmpp.store.spaces.get_unupdated(
295 orm, self.session.user_pk
296 )
297 for space in spaces:
298 await self.update_space_if_needed(space)
300 async def space_legacy_id_to_node(self, legacy_id: str) -> str:
301 return str(legacy_id)
303 async def space_node_to_legacy_id(self, node: str) -> str:
304 return node
306 async def update_space_metadata(
307 self,
308 legacy_id: str,
309 metadata: SpaceMetadata,
310 ) -> None:
311 """
312 Updates metadata associated to a space.
314 :param legacy_id: Identifier of the space.
315 :param name: Metadata associated to this space.
316 """
317 with self.orm(expire_on_commit=False) as orm:
318 space = self.session.xmpp.store.spaces.add_or_get(
319 orm,
320 self.session.user_pk,
321 str(legacy_id),
322 )
323 space = await self.__update_space_metadata(space, metadata)
324 self.xmpp.pubsub.broadcast_space_metadata(
325 self.session, space, await self.space_legacy_id_to_node(legacy_id)
326 )