Coverage for slidge/group/bookmarks.py: 89%
101 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import abc
2import logging
3from typing import TYPE_CHECKING, Generic, Iterator, Optional, Type
5from slixmpp import JID
6from slixmpp.exceptions import XMPPError
7from slixmpp.jid import _unescape_node
9from ..contact.roster import ESCAPE_TABLE
10from ..core.mixins.lock import NamedLockMixin
11from ..db.models import Room
12from ..util import SubclassableOnce
13from ..util.types import LegacyGroupIdType, LegacyMUCType
14from .archive import MessageArchive
15from .room import LegacyMUC
17if TYPE_CHECKING:
18 from slidge.core.session import BaseSession
21class LegacyBookmarks(
22 Generic[LegacyGroupIdType, LegacyMUCType],
23 NamedLockMixin,
24 metaclass=SubclassableOnce,
25):
26 """
27 This is instantiated once per :class:`~slidge.BaseSession`
28 """
30 def __init__(self, session: "BaseSession"):
31 self.session = session
32 self.xmpp = session.xmpp
33 self.user_jid = session.user_jid
34 self.__store = self.xmpp.store.rooms
36 self._muc_class: Type[LegacyMUCType] = LegacyMUC.get_self_or_unique_subclass()
38 self._user_nick: str = self.session.user_jid.node
40 super().__init__()
41 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks")
42 self.ready = self.session.xmpp.loop.create_future()
43 if not self.xmpp.GROUPS:
44 self.ready.set_result(True)
46 @property
47 def user_nick(self):
48 return self._user_nick
50 @user_nick.setter
51 def user_nick(self, nick: str):
52 self._user_nick = nick
54 def __iter__(self) -> Iterator[LegacyMUCType]:
55 for stored in self.__store.get_all(user_pk=self.session.user_pk):
56 yield self._muc_class.from_store(self.session, stored)
58 def __repr__(self):
59 return f"<Bookmarks of {self.user_jid}>"
61 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType):
62 return await self.legacy_id_to_jid_username(legacy_id)
64 async def jid_local_part_to_legacy_id(self, local_part: str):
65 return await self.jid_username_to_legacy_id(local_part)
67 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType):
68 """
69 The default implementation calls ``str()`` on the legacy_id and
70 escape characters according to :xep:`0106`.
72 You can override this class and implement a more subtle logic to raise
73 an :class:`~slixmpp.exceptions.XMPPError` early
75 :param legacy_id:
76 :return:
77 """
78 return str(legacy_id).translate(ESCAPE_TABLE)
80 async def jid_username_to_legacy_id(self, username: str):
81 """
83 :param username:
84 :return:
85 """
86 return _unescape_node(username)
88 async def by_jid(self, jid: JID) -> LegacyMUCType:
89 if jid.resource:
90 jid = JID(jid.bare)
91 async with self.lock(("bare", jid.bare)):
92 assert isinstance(jid.local, str)
93 legacy_id = await self.jid_local_part_to_legacy_id(jid.local)
94 if self.get_lock(("legacy_id", legacy_id)):
95 self.log.debug("Not instantiating %s after all", jid)
96 return await self.by_legacy_id(legacy_id)
98 with self.__store.session():
99 stored = self.__store.get_by_jid(self.session.user_pk, jid)
100 return await self.__update_muc(stored, legacy_id, jid)
102 def by_jid_only_if_exists(self, jid: JID) -> Optional[LegacyMUCType]:
103 with self.__store.session():
104 stored = self.__store.get_by_jid(self.session.user_pk, jid)
105 if stored is not None and stored.updated:
106 return self._muc_class.from_store(self.session, stored)
107 return None
109 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> LegacyMUCType:
110 async with self.lock(("legacy_id", legacy_id)):
111 local = await self.legacy_id_to_jid_local_part(legacy_id)
112 jid = JID(f"{local}@{self.xmpp.boundjid}")
113 if self.get_lock(("bare", jid.bare)):
114 self.log.debug("Not instantiating %s after all", legacy_id)
115 return await self.by_jid(jid)
117 with self.__store.session():
118 stored = self.__store.get_by_legacy_id(
119 self.session.user_pk, str(legacy_id)
120 )
121 return await self.__update_muc(stored, legacy_id, jid)
123 async def __update_muc(
124 self, stored: Room | None, legacy_id: LegacyGroupIdType, jid: JID
125 ):
126 if stored is None:
127 muc = self._muc_class(self.session, legacy_id=legacy_id, jid=jid)
128 else:
129 muc = self._muc_class.from_store(self.session, stored)
130 if stored.updated:
131 return muc
133 try:
134 with muc.updating_info():
135 await muc.avatar_wrap_update_info()
136 except XMPPError:
137 raise
138 except Exception as e:
139 raise XMPPError("internal-server-error", str(e))
140 if not muc.user_nick:
141 muc.user_nick = self._user_nick
142 self.log.debug("MUC created: %r", muc)
143 muc.pk = self.__store.update(muc)
144 muc.archive = MessageArchive(muc.pk, self.xmpp.store.mam)
145 return muc
147 @abc.abstractmethod
148 async def fill(self):
149 """
150 Establish a user's known groups.
152 This has to be overridden in plugins with group support and at the
153 minimum, this should ``await self.by_legacy_id(group_id)`` for all
154 the groups a user is part of.
156 Slidge internals will call this on successful :meth:`BaseSession.login`
158 """
159 if self.xmpp.GROUPS:
160 raise NotImplementedError(
161 "The plugin advertised support for groups but"
162 " LegacyBookmarks.fill() was not overridden."
163 )
165 async def remove(
166 self,
167 muc: LegacyMUC,
168 reason="You left this group from the official client.",
169 kick=True,
170 ) -> None:
171 """
172 Delete everything about a specific group.
174 This should be called when the user leaves the group from the official
175 app.
177 :param muc: The MUC to remove.
178 :param reason: Optionally, a reason why this group was removed.
179 :param kick: Whether the user should be kicked from this group. Set this
180 to False in case you do this somewhere else in your code, eg, on
181 receiving the confirmation that the group was deleted.
182 """
183 assert muc.pk is not None
184 if kick:
185 user_participant = await muc.get_user_participant()
186 user_participant.kick(reason)
187 self.__store.delete(muc.pk)