Coverage for slidge / contact / roster.py: 76%
131 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
1import asyncio
2import logging
3import warnings
4from collections.abc import AsyncIterator, Iterator
5from typing import TYPE_CHECKING, Generic
7from slixmpp import JID
8from slixmpp.exceptions import IqError, IqTimeout, XMPPError
9from sqlalchemy.orm import Session
10from sqlalchemy.orm import Session as OrmSession
12from ..db.models import Contact, GatewayUser
13from ..util import SubclassableOnce
14from ..util.jid_escaping import ESCAPE_TABLE, unescape_node
15from ..util.lock import NamedLockMixin
16from ..util.types import LegacyContactType, LegacyUserIdType
17from ..util.util import timeit
18from .contact import LegacyContact
20if TYPE_CHECKING:
21 from ..core.session import BaseSession
24class ContactIsUser(Exception):
25 pass
28class LegacyRoster(
29 Generic[LegacyUserIdType, LegacyContactType],
30 NamedLockMixin,
31 metaclass=SubclassableOnce,
32):
33 """
34 Virtual roster of a gateway user that allows to represent all
35 of their contacts as singleton instances (if used properly and not too bugged).
37 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance
38 accessible via the :attr:`.BaseSession.contacts` attribute.
40 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to
41 retrieve a contact instance.
43 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or
44 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic
45 if you need some characters when translation JID user parts and legacy IDs.
46 """
48 _contact_cls: type[LegacyContactType]
50 def __init__(self, session: "BaseSession") -> None:
51 super().__init__()
53 self.log = logging.getLogger(f"{session.user_jid.bare}:roster")
54 self.user_legacy_id: LegacyUserIdType | None = None
55 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future()
57 self.session = session
58 self.__filling = False
60 @property
61 def user(self) -> GatewayUser:
62 return self.session.user
64 def orm(self) -> Session:
65 return self.session.xmpp.store.session()
67 def from_store(self, stored: Contact) -> LegacyContactType:
68 return self._contact_cls(self.session, stored=stored)
70 def __repr__(self) -> str:
71 return f"<Roster of {self.session.user_jid}>"
73 def __iter__(self) -> Iterator[LegacyContactType]:
74 with self.orm() as orm:
75 for stored in orm.query(Contact).filter_by(user=self.user).all():
76 if stored.updated:
77 yield self.from_store(stored)
79 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]:
80 if only_friends:
81 return {c.jid.bare: c for c in self if c.is_friend}
82 return {c.jid.bare: c for c in self}
84 async def by_jid(self, contact_jid: JID) -> LegacyContactType:
85 # """
86 # Retrieve a contact by their JID
87 #
88 # If the contact was not instantiated before, it will be created
89 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their
90 # legacy user ID.
91 #
92 # :param contact_jid:
93 # :return:
94 # """
95 username = contact_jid.node
96 if not username:
97 raise XMPPError(
98 "bad-request", "Contacts must have a local part in their JID"
99 )
100 contact_jid = JID(contact_jid.bare)
101 async with self.lock(("username", username)):
102 legacy_id = await self.jid_username_to_legacy_id(username)
103 if legacy_id == self.user_legacy_id:
104 raise ContactIsUser
105 if self.get_lock(("legacy_id", legacy_id)):
106 self.log.debug("Already updating %s via by_legacy_id()", contact_jid)
107 return await self.by_legacy_id(legacy_id)
109 with self.orm() as orm:
110 stored = (
111 orm.query(Contact)
112 .filter_by(user=self.user, jid=contact_jid)
113 .one_or_none()
114 )
115 if stored is None:
116 stored = Contact(
117 user_account_id=self.session.user_pk,
118 legacy_id=legacy_id,
119 jid=contact_jid,
120 )
121 return await self.__update_if_needed(stored)
123 async def __update_if_needed(self, stored: Contact) -> LegacyContactType:
124 contact = self.from_store(stored)
125 if contact.stored.updated:
126 return contact
128 with contact.updating_info(merge=False):
129 await contact.update_info()
130 if contact.is_friend and not self.__filling:
131 await contact.add_to_roster()
133 if contact.cached_presence is not None:
134 contact._store_last_presence(contact.cached_presence)
135 return contact
137 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None:
138 with self.orm() as orm:
139 stored = (
140 orm.query(Contact)
141 .filter_by(user=self.user, jid=contact_jid)
142 .one_or_none()
143 )
144 if stored is not None and stored.updated:
145 return self.from_store(stored)
146 return None
148 @timeit
149 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType:
150 """
151 Retrieve a contact by their legacy_id
153 If the contact was not instantiated before, it will be created
154 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their
155 legacy user ID.
157 :param legacy_id:
158 :return:
159 """
160 if legacy_id == self.user_legacy_id:
161 raise ContactIsUser
162 async with self.lock(("legacy_id", legacy_id)):
163 username = await self.legacy_id_to_jid_username(legacy_id)
164 if self.get_lock(("username", username)):
165 self.log.debug("Already updating %s via by_jid()", username)
167 return await self.by_jid(
168 JID(username + "@" + self.session.xmpp.boundjid.bare)
169 )
171 with self.orm() as orm:
172 stored = (
173 orm.query(Contact)
174 .filter_by(user=self.user, legacy_id=str(legacy_id))
175 .one_or_none()
176 )
177 if stored is None:
178 stored = Contact(
179 user_account_id=self.session.user_pk,
180 legacy_id=str(legacy_id),
181 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"),
182 )
183 return await self.__update_if_needed(stored)
185 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str:
186 """
187 Convert a legacy ID to a valid 'user' part of a JID
189 Should be overridden for cases where the str conversion of
190 the legacy_id is not enough, e.g., if it is case-sensitive or contains
191 forbidden characters not covered by :xep:`0106`.
193 :param legacy_id:
194 """
195 return str(legacy_id).translate(ESCAPE_TABLE)
197 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType:
198 """
199 Convert a JID user part to a legacy ID.
201 Should be overridden in case legacy IDs are not strings, or more generally
202 for any case where the username part of a JID (unescaped with to the mapping
203 defined by :xep:`0106`) is not enough to identify a contact on the legacy network.
205 Default implementation is an identity operation
207 :param jid_username: User part of a JID, ie "user" in "user@example.com"
208 :return: An identifier for the user on the legacy network.
209 """
210 return unescape_node(jid_username) # type:ignore
212 @timeit
213 async def _fill(self, orm: OrmSession):
214 try:
215 if hasattr(self.session.xmpp, "TEST_MODE"):
216 # dirty hack to avoid mocking xmpp server replies to this
217 # during tests
218 raise PermissionError
219 iq = await self.session.xmpp["xep_0356"].get_roster(
220 self.session.user_jid.bare
221 )
222 user_roster = iq["roster"]["items"]
223 except (PermissionError, IqError, IqTimeout):
224 user_roster = None
226 self.__filling = True
227 async for contact in self.fill():
228 if user_roster is None:
229 continue
230 item = contact.get_roster_item()
231 old = user_roster.get(contact.jid.bare)
232 if old is not None and all(
233 old[k] == item[contact.jid.bare].get(k)
234 for k in ("subscription", "groups", "name")
235 ):
236 self.log.debug("No need to update roster")
237 continue
238 self.log.debug("Updating roster")
239 if not contact.is_friend:
240 continue
241 if not self.session.user.preferences.get("roster_push", True):
242 continue
243 try:
244 await self.session.xmpp["xep_0356"].set_roster(
245 self.session.user_jid.bare,
246 item,
247 )
248 except (PermissionError, IqError, IqTimeout) as e:
249 warnings.warn(f"Could not add to roster: {e}")
250 else:
251 contact.added_to_roster = True
252 contact.send_last_presence(force=True)
253 orm.commit()
254 self.__filling = False
256 async def fill(self) -> AsyncIterator[LegacyContact]:
257 """
258 Populate slidge's "virtual roster".
260 This should yield contacts that are meant to be added to the user's
261 roster, typically by using ``await self.by_legacy_id(contact_id)``.
262 Setting the contact nicknames, avatar, etc. should be in
263 :meth:`LegacyContact.update_info()`
265 It's not mandatory to override this method, but it is recommended way
266 to populate "friends" of the user. Calling
267 ``await (await self.by_legacy_id(contact_id)).add_to_roster()``
268 accomplishes the same thing, but doing it in here allows to batch
269 DB queries and is better performance-wise.
271 """
272 return
273 yield
276log = logging.getLogger(__name__)