Coverage for slidge/contact/roster.py: 75%
119 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import asyncio
2import logging
3import warnings
4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type
6from slixmpp import JID
7from slixmpp.exceptions import IqError, IqTimeout, XMPPError
8from slixmpp.jid import JID_UNESCAPE_TRANSFORMATIONS, _unescape_node
10from ..core.mixins.lock import NamedLockMixin
11from ..db.models import Contact
12from ..db.store import ContactStore
13from ..util import SubclassableOnce
14from ..util.types import LegacyContactType, LegacyUserIdType
15from .contact import LegacyContact
17if TYPE_CHECKING:
18 from ..core.session import BaseSession
21class ContactIsUser(Exception):
22 pass
25class LegacyRoster(
26 Generic[LegacyUserIdType, LegacyContactType],
27 NamedLockMixin,
28 metaclass=SubclassableOnce,
29):
30 """
31 Virtual roster of a gateway user, that allows to represent all
32 of their contacts as singleton instances (if used properly and not too bugged).
34 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance
35 accessible via the :attr:`.BaseSession.contacts` attribute.
37 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to
38 retrieve a contact instance.
40 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or
41 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic
42 if you need some characters when translation JID user parts and legacy IDs.
43 """
45 def __init__(self, session: "BaseSession"):
46 self._contact_cls: Type[LegacyContactType] = (
47 LegacyContact.get_self_or_unique_subclass()
48 )
49 self._contact_cls.xmpp = session.xmpp
50 self.__store: ContactStore = session.xmpp.store.contacts
52 self.session = session
53 self.log = logging.getLogger(f"{self.session.user_jid.bare}:roster")
54 self.user_legacy_id: Optional[LegacyUserIdType] = None
55 self.ready: asyncio.Future[bool] = self.session.xmpp.loop.create_future()
56 self.__filling = False
57 super().__init__()
59 def __repr__(self):
60 return f"<Roster of {self.session.user_jid}>"
62 def __iter__(self) -> Iterator[LegacyContactType]:
63 with self.__store.session():
64 for stored in self.__store.get_all(user_pk=self.session.user_pk):
65 yield self._contact_cls.from_store(self.session, stored)
67 def known_contacts(self, only_friends=True) -> dict[str, LegacyContactType]:
68 if only_friends:
69 return {c.jid.bare: c for c in self if c.is_friend}
70 return {c.jid.bare: c for c in self}
72 async def by_jid(self, contact_jid: JID) -> LegacyContactType:
73 # """
74 # Retrieve a contact by their JID
75 #
76 # If the contact was not instantiated before, it will be created
77 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their
78 # legacy user ID.
79 #
80 # :param contact_jid:
81 # :return:
82 # """
83 username = contact_jid.node
84 async with self.lock(("username", username)):
85 legacy_id = await self.jid_username_to_legacy_id(username)
86 log.debug("Contact %s not found", contact_jid)
87 if self.get_lock(("legacy_id", legacy_id)):
88 log.debug("Already updating %s", contact_jid)
89 return await self.by_legacy_id(legacy_id)
91 with self.__store.session():
92 stored = self.__store.get_by_jid(self.session.user_pk, contact_jid)
93 return await self.__update_contact(stored, legacy_id, username)
95 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None:
96 with self.__store.session():
97 stored = self.__store.get_by_jid(self.session.user_pk, contact_jid)
98 if stored is not None and stored.updated:
99 return self._contact_cls.from_store(self.session, stored)
100 return None
102 async def by_legacy_id(
103 self, legacy_id: LegacyUserIdType, *args, **kwargs
104 ) -> LegacyContactType:
105 """
106 Retrieve a contact by their legacy_id
108 If the contact was not instantiated before, it will be created
109 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their
110 legacy user ID.
112 :param legacy_id:
113 :param args: arbitrary additional positional arguments passed to the contact constructor.
114 Requires subclassing LegacyContact.__init__ to accept those.
115 This is useful for networks where you fetch the contact list and information
116 about these contacts in a single request
117 :param kwargs: arbitrary keyword arguments passed to the contact constructor
118 :return:
119 """
120 if legacy_id == self.user_legacy_id:
121 raise ContactIsUser
122 async with self.lock(("legacy_id", legacy_id)):
123 username = await self.legacy_id_to_jid_username(legacy_id)
124 if self.get_lock(("username", username)):
125 log.debug("Already updating %s", username)
126 jid = JID()
127 jid.node = username
128 jid.domain = self.session.xmpp.boundjid.bare
129 return await self.by_jid(jid)
131 with self.__store.session():
132 stored = self.__store.get_by_legacy_id(
133 self.session.user_pk, str(legacy_id)
134 )
135 return await self.__update_contact(
136 stored, legacy_id, username, *args, **kwargs
137 )
139 async def __update_contact(
140 self,
141 stored: Contact | None,
142 legacy_id: LegacyUserIdType,
143 username: str,
144 *a,
145 **kw,
146 ) -> LegacyContactType:
147 if stored is None:
148 contact = self._contact_cls(self.session, legacy_id, username, *a, **kw)
149 else:
150 contact = self._contact_cls.from_store(self.session, stored, *a, **kw)
151 if stored.updated:
152 return contact
154 try:
155 with contact.updating_info():
156 await contact.avatar_wrap_update_info()
157 except XMPPError:
158 raise
159 except Exception as e:
160 raise XMPPError("internal-server-error", str(e))
161 contact._caps_ver = await contact.get_caps_ver(contact.jid)
162 contact.contact_pk = self.__store.update(contact, commit=not self.__filling)
163 return contact
165 async def by_stanza(self, s) -> LegacyContact:
166 # """
167 # Retrieve a contact by the destination of a stanza
168 #
169 # See :meth:`slidge.Roster.by_legacy_id` for more info.
170 #
171 # :param s:
172 # :return:
173 # """
174 return await self.by_jid(s.get_to())
176 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str:
177 """
178 Convert a legacy ID to a valid 'user' part of a JID
180 Should be overridden for cases where the str conversion of
181 the legacy_id is not enough, e.g., if it is case-sensitive or contains
182 forbidden characters not covered by :xep:`0106`.
184 :param legacy_id:
185 """
186 return str(legacy_id).translate(ESCAPE_TABLE)
188 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType:
189 """
190 Convert a JID user part to a legacy ID.
192 Should be overridden in case legacy IDs are not strings, or more generally
193 for any case where the username part of a JID (unescaped with to the mapping
194 defined by :xep:`0106`) is not enough to identify a contact on the legacy network.
196 Default implementation is an identity operation
198 :param jid_username: User part of a JID, ie "user" in "user@example.com"
199 :return: An identifier for the user on the legacy network.
200 """
201 return _unescape_node(jid_username)
203 async def _fill(self):
204 try:
205 if hasattr(self.session.xmpp, "TEST_MODE"):
206 # dirty hack to avoid mocking xmpp server replies to this
207 # during tests
208 raise PermissionError
209 iq = await self.session.xmpp["xep_0356"].get_roster(
210 self.session.user_jid.bare
211 )
212 user_roster = iq["roster"]["items"]
213 except (PermissionError, IqError, IqTimeout):
214 user_roster = None
216 with self.__store.session() as orm:
217 self.__filling = True
218 async for contact in self.fill():
219 if user_roster is None:
220 continue
221 item = contact.get_roster_item()
222 old = user_roster.get(contact.jid.bare)
223 if old is not None and all(
224 old[k] == item[contact.jid.bare].get(k)
225 for k in ("subscription", "groups", "name")
226 ):
227 self.log.debug("No need to update roster")
228 continue
229 self.log.debug("Updating roster")
230 try:
231 await self.session.xmpp["xep_0356"].set_roster(
232 self.session.user_jid.bare,
233 item,
234 )
235 except (PermissionError, IqError, IqTimeout) as e:
236 warnings.warn(f"Could not add to roster: {e}")
237 else:
238 contact._added_to_roster = True
239 orm.commit()
240 self.__filling = False
242 async def fill(self) -> AsyncIterator[LegacyContact]:
243 """
244 Populate slidge's "virtual roster".
246 This should yield contacts that are meant to be added to the user's
247 roster, typically by using ``await self.by_legacy_id(contact_id)``.
248 Setting the contact nicknames, avatar, etc. should be in
249 :meth:`LegacyContact.update_info()`
251 It's not mandatory to override this method, but it is recommended way
252 to populate "friends" of the user. Calling
253 ``await (await self.by_legacy_id(contact_id)).add_to_roster()``
254 accomplishes the same thing, but doing it in here allows to batch
255 DB queries and is better performance-wise.
257 """
258 return
259 yield
262ESCAPE_TABLE = "".maketrans({v: k for k, v in JID_UNESCAPE_TRANSFORMATIONS.items()})
263log = logging.getLogger(__name__)