Coverage for slidge/contact/roster.py: 76%
127 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import asyncio
2import logging
3import warnings
4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type
6from slixmpp import JID
7from slixmpp.exceptions import IqError, IqTimeout, XMPPError
8from sqlalchemy.orm import Session
9from sqlalchemy.orm import Session as OrmSession
11from ..db.models import Contact, GatewayUser
12from ..util import SubclassableOnce
13from ..util.jid_escaping import ESCAPE_TABLE, unescape_node
14from ..util.lock import NamedLockMixin
15from ..util.types import LegacyContactType, LegacyUserIdType
16from ..util.util import timeit
17from .contact import LegacyContact
19if TYPE_CHECKING:
20 from ..core.session import BaseSession
23class ContactIsUser(Exception):
24 pass
27class LegacyRoster(
28 Generic[LegacyUserIdType, LegacyContactType],
29 NamedLockMixin,
30 metaclass=SubclassableOnce,
31):
32 """
33 Virtual roster of a gateway user that allows to represent all
34 of their contacts as singleton instances (if used properly and not too bugged).
36 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance
37 accessible via the :attr:`.BaseSession.contacts` attribute.
39 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to
40 retrieve a contact instance.
42 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or
43 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic
44 if you need some characters when translation JID user parts and legacy IDs.
45 """
47 _contact_cls: Type[LegacyContactType]
49 def __init__(self, session: "BaseSession") -> None:
50 super().__init__()
52 self.log = logging.getLogger(f"{session.user_jid.bare}:roster")
53 self.user_legacy_id: Optional[LegacyUserIdType] = None
54 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future()
56 self.session = session
57 self.__filling = False
59 @property
60 def user(self) -> GatewayUser:
61 return self.session.user
63 def orm(self) -> Session:
64 return self.session.xmpp.store.session()
66 def from_store(self, stored: Contact) -> LegacyContactType:
67 return self._contact_cls(self.session, stored=stored)
69 def __repr__(self) -> str:
70 return f"<Roster of {self.session.user_jid}>"
72 def __iter__(self) -> Iterator[LegacyContactType]:
73 with self.orm() as orm:
74 for stored in orm.query(Contact).filter_by(user=self.user).all():
75 if stored.updated:
76 yield self.from_store(stored)
78 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]:
79 if only_friends:
80 return {c.jid.bare: c for c in self if c.is_friend}
81 return {c.jid.bare: c for c in self}
83 async def by_jid(self, contact_jid: JID) -> LegacyContactType:
84 # """
85 # Retrieve a contact by their JID
86 #
87 # If the contact was not instantiated before, it will be created
88 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their
89 # legacy user ID.
90 #
91 # :param contact_jid:
92 # :return:
93 # """
94 username = contact_jid.node
95 if not username:
96 raise XMPPError(
97 "bad-request", "Contacts must have a local part in their JID"
98 )
99 contact_jid = JID(contact_jid.bare)
100 async with self.lock(("username", username)):
101 legacy_id = await self.jid_username_to_legacy_id(username)
102 if legacy_id == self.user_legacy_id:
103 raise ContactIsUser
104 if self.get_lock(("legacy_id", legacy_id)):
105 self.log.debug("Already updating %s via by_legacy_id()", contact_jid)
106 return await self.by_legacy_id(legacy_id)
108 with self.orm() as orm:
109 stored = (
110 orm.query(Contact)
111 .filter_by(user=self.user, jid=contact_jid)
112 .one_or_none()
113 )
114 if stored is None:
115 stored = Contact(
116 user_account_id=self.session.user_pk,
117 legacy_id=legacy_id,
118 jid=contact_jid,
119 )
120 return await self.__update_if_needed(stored)
122 async def __update_if_needed(self, stored: Contact) -> LegacyContactType:
123 contact = self.from_store(stored)
124 if contact.stored.updated:
125 return contact
127 with contact.updating_info():
128 await contact.update_info()
130 if contact.cached_presence is not None:
131 contact._store_last_presence(contact.cached_presence)
132 return contact
134 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None:
135 with self.orm() as orm:
136 stored = (
137 orm.query(Contact)
138 .filter_by(user=self.user, jid=contact_jid)
139 .one_or_none()
140 )
141 if stored is not None and stored.updated:
142 return self.from_store(stored)
143 return None
145 @timeit
146 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType:
147 """
148 Retrieve a contact by their legacy_id
150 If the contact was not instantiated before, it will be created
151 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their
152 legacy user ID.
154 :param legacy_id:
155 :return:
156 """
157 if legacy_id == self.user_legacy_id:
158 raise ContactIsUser
159 async with self.lock(("legacy_id", legacy_id)):
160 username = await self.legacy_id_to_jid_username(legacy_id)
161 if self.get_lock(("username", username)):
162 self.log.debug("Already updating %s via by_jid()", username)
164 jid = JID()
165 jid.node = username
166 jid.domain = self.session.xmpp.boundjid.bare
167 return await self.by_jid(jid)
169 with self.orm() as orm:
170 stored = (
171 orm.query(Contact)
172 .filter_by(user=self.user, legacy_id=str(legacy_id))
173 .one_or_none()
174 )
175 if stored is None:
176 stored = Contact(
177 user_account_id=self.session.user_pk,
178 legacy_id=str(legacy_id),
179 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"),
180 )
181 return await self.__update_if_needed(stored)
183 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str:
184 """
185 Convert a legacy ID to a valid 'user' part of a JID
187 Should be overridden for cases where the str conversion of
188 the legacy_id is not enough, e.g., if it is case-sensitive or contains
189 forbidden characters not covered by :xep:`0106`.
191 :param legacy_id:
192 """
193 return str(legacy_id).translate(ESCAPE_TABLE)
195 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType:
196 """
197 Convert a JID user part to a legacy ID.
199 Should be overridden in case legacy IDs are not strings, or more generally
200 for any case where the username part of a JID (unescaped with to the mapping
201 defined by :xep:`0106`) is not enough to identify a contact on the legacy network.
203 Default implementation is an identity operation
205 :param jid_username: User part of a JID, ie "user" in "user@example.com"
206 :return: An identifier for the user on the legacy network.
207 """
208 return unescape_node(jid_username) # type:ignore
210 @timeit
211 async def _fill(self, orm: OrmSession):
212 try:
213 if hasattr(self.session.xmpp, "TEST_MODE"):
214 # dirty hack to avoid mocking xmpp server replies to this
215 # during tests
216 raise PermissionError
217 iq = await self.session.xmpp["xep_0356"].get_roster(
218 self.session.user_jid.bare
219 )
220 user_roster = iq["roster"]["items"]
221 except (PermissionError, IqError, IqTimeout):
222 user_roster = None
224 self.__filling = True
225 async for contact in self.fill():
226 if user_roster is None:
227 continue
228 item = contact.get_roster_item()
229 old = user_roster.get(contact.jid.bare)
230 if old is not None and all(
231 old[k] == item[contact.jid.bare].get(k)
232 for k in ("subscription", "groups", "name")
233 ):
234 self.log.debug("No need to update roster")
235 continue
236 self.log.debug("Updating roster")
237 try:
238 await self.session.xmpp["xep_0356"].set_roster(
239 self.session.user_jid.bare,
240 item,
241 )
242 except (PermissionError, IqError, IqTimeout) as e:
243 warnings.warn(f"Could not add to roster: {e}")
244 else:
245 contact.added_to_roster = True
246 contact.send_last_presence(force=True)
247 orm.commit()
248 self.__filling = False
250 async def fill(self) -> AsyncIterator[LegacyContact]:
251 """
252 Populate slidge's "virtual roster".
254 This should yield contacts that are meant to be added to the user's
255 roster, typically by using ``await self.by_legacy_id(contact_id)``.
256 Setting the contact nicknames, avatar, etc. should be in
257 :meth:`LegacyContact.update_info()`
259 It's not mandatory to override this method, but it is recommended way
260 to populate "friends" of the user. Calling
261 ``await (await self.by_legacy_id(contact_id)).add_to_roster()``
262 accomplishes the same thing, but doing it in here allows to batch
263 DB queries and is better performance-wise.
265 """
266 return
267 yield
270log = logging.getLogger(__name__)