Coverage for slidge / contact / roster.py: 76%
130 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import asyncio
2import logging
3import warnings
4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type
6from slixmpp import JID
7from slixmpp.exceptions import IqError, IqTimeout, XMPPError
8from sqlalchemy.orm import Session
9from sqlalchemy.orm import Session as OrmSession
11from ..db.models import Contact, GatewayUser
12from ..util import SubclassableOnce
13from ..util.jid_escaping import ESCAPE_TABLE, unescape_node
14from ..util.lock import NamedLockMixin
15from ..util.types import LegacyContactType, LegacyUserIdType
16from ..util.util import timeit
17from .contact import LegacyContact
19if TYPE_CHECKING:
20 from ..core.session import BaseSession
23class ContactIsUser(Exception):
24 pass
27class LegacyRoster(
28 Generic[LegacyUserIdType, LegacyContactType],
29 NamedLockMixin,
30 metaclass=SubclassableOnce,
31):
32 """
33 Virtual roster of a gateway user that allows to represent all
34 of their contacts as singleton instances (if used properly and not too bugged).
36 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance
37 accessible via the :attr:`.BaseSession.contacts` attribute.
39 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to
40 retrieve a contact instance.
42 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or
43 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic
44 if you need some characters when translation JID user parts and legacy IDs.
45 """
47 _contact_cls: Type[LegacyContactType]
49 def __init__(self, session: "BaseSession") -> None:
50 super().__init__()
52 self.log = logging.getLogger(f"{session.user_jid.bare}:roster")
53 self.user_legacy_id: Optional[LegacyUserIdType] = None
54 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future()
56 self.session = session
57 self.__filling = False
59 @property
60 def user(self) -> GatewayUser:
61 return self.session.user
63 def orm(self) -> Session:
64 return self.session.xmpp.store.session()
66 def from_store(self, stored: Contact) -> LegacyContactType:
67 return self._contact_cls(self.session, stored=stored)
69 def __repr__(self) -> str:
70 return f"<Roster of {self.session.user_jid}>"
72 def __iter__(self) -> Iterator[LegacyContactType]:
73 with self.orm() as orm:
74 for stored in orm.query(Contact).filter_by(user=self.user).all():
75 if stored.updated:
76 yield self.from_store(stored)
78 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]:
79 if only_friends:
80 return {c.jid.bare: c for c in self if c.is_friend}
81 return {c.jid.bare: c for c in self}
83 async def by_jid(self, contact_jid: JID) -> LegacyContactType:
84 # """
85 # Retrieve a contact by their JID
86 #
87 # If the contact was not instantiated before, it will be created
88 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their
89 # legacy user ID.
90 #
91 # :param contact_jid:
92 # :return:
93 # """
94 username = contact_jid.node
95 if not username:
96 raise XMPPError(
97 "bad-request", "Contacts must have a local part in their JID"
98 )
99 contact_jid = JID(contact_jid.bare)
100 async with self.lock(("username", username)):
101 legacy_id = await self.jid_username_to_legacy_id(username)
102 if legacy_id == self.user_legacy_id:
103 raise ContactIsUser
104 if self.get_lock(("legacy_id", legacy_id)):
105 self.log.debug("Already updating %s via by_legacy_id()", contact_jid)
106 return await self.by_legacy_id(legacy_id)
108 with self.orm() as orm:
109 stored = (
110 orm.query(Contact)
111 .filter_by(user=self.user, jid=contact_jid)
112 .one_or_none()
113 )
114 if stored is None:
115 stored = Contact(
116 user_account_id=self.session.user_pk,
117 legacy_id=legacy_id,
118 jid=contact_jid,
119 )
120 return await self.__update_if_needed(stored)
122 async def __update_if_needed(self, stored: Contact) -> LegacyContactType:
123 contact = self.from_store(stored)
124 if contact.stored.updated:
125 return contact
127 with contact.updating_info(merge=False):
128 await contact.update_info()
129 if contact.is_friend and not self.__filling:
130 await contact.add_to_roster()
132 if contact.cached_presence is not None:
133 contact._store_last_presence(contact.cached_presence)
134 return contact
136 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None:
137 with self.orm() as orm:
138 stored = (
139 orm.query(Contact)
140 .filter_by(user=self.user, jid=contact_jid)
141 .one_or_none()
142 )
143 if stored is not None and stored.updated:
144 return self.from_store(stored)
145 return None
147 @timeit
148 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType:
149 """
150 Retrieve a contact by their legacy_id
152 If the contact was not instantiated before, it will be created
153 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their
154 legacy user ID.
156 :param legacy_id:
157 :return:
158 """
159 if legacy_id == self.user_legacy_id:
160 raise ContactIsUser
161 async with self.lock(("legacy_id", legacy_id)):
162 username = await self.legacy_id_to_jid_username(legacy_id)
163 if self.get_lock(("username", username)):
164 self.log.debug("Already updating %s via by_jid()", username)
166 return await self.by_jid(
167 JID(username + "@" + self.session.xmpp.boundjid.bare)
168 )
170 with self.orm() as orm:
171 stored = (
172 orm.query(Contact)
173 .filter_by(user=self.user, legacy_id=str(legacy_id))
174 .one_or_none()
175 )
176 if stored is None:
177 stored = Contact(
178 user_account_id=self.session.user_pk,
179 legacy_id=str(legacy_id),
180 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"),
181 )
182 return await self.__update_if_needed(stored)
184 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str:
185 """
186 Convert a legacy ID to a valid 'user' part of a JID
188 Should be overridden for cases where the str conversion of
189 the legacy_id is not enough, e.g., if it is case-sensitive or contains
190 forbidden characters not covered by :xep:`0106`.
192 :param legacy_id:
193 """
194 return str(legacy_id).translate(ESCAPE_TABLE)
196 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType:
197 """
198 Convert a JID user part to a legacy ID.
200 Should be overridden in case legacy IDs are not strings, or more generally
201 for any case where the username part of a JID (unescaped with to the mapping
202 defined by :xep:`0106`) is not enough to identify a contact on the legacy network.
204 Default implementation is an identity operation
206 :param jid_username: User part of a JID, ie "user" in "user@example.com"
207 :return: An identifier for the user on the legacy network.
208 """
209 return unescape_node(jid_username) # type:ignore
211 @timeit
212 async def _fill(self, orm: OrmSession):
213 try:
214 if hasattr(self.session.xmpp, "TEST_MODE"):
215 # dirty hack to avoid mocking xmpp server replies to this
216 # during tests
217 raise PermissionError
218 iq = await self.session.xmpp["xep_0356"].get_roster(
219 self.session.user_jid.bare
220 )
221 user_roster = iq["roster"]["items"]
222 except (PermissionError, IqError, IqTimeout):
223 user_roster = None
225 self.__filling = True
226 async for contact in self.fill():
227 if user_roster is None:
228 continue
229 item = contact.get_roster_item()
230 old = user_roster.get(contact.jid.bare)
231 if old is not None and all(
232 old[k] == item[contact.jid.bare].get(k)
233 for k in ("subscription", "groups", "name")
234 ):
235 self.log.debug("No need to update roster")
236 continue
237 self.log.debug("Updating roster")
238 if not contact.is_friend:
239 continue
240 if not self.session.user.preferences.get("roster_push", True):
241 continue
242 try:
243 await self.session.xmpp["xep_0356"].set_roster(
244 self.session.user_jid.bare,
245 item,
246 )
247 except (PermissionError, IqError, IqTimeout) as e:
248 warnings.warn(f"Could not add to roster: {e}")
249 else:
250 contact.added_to_roster = True
251 contact.send_last_presence(force=True)
252 orm.commit()
253 self.__filling = False
255 async def fill(self) -> AsyncIterator[LegacyContact]:
256 """
257 Populate slidge's "virtual roster".
259 This should yield contacts that are meant to be added to the user's
260 roster, typically by using ``await self.by_legacy_id(contact_id)``.
261 Setting the contact nicknames, avatar, etc. should be in
262 :meth:`LegacyContact.update_info()`
264 It's not mandatory to override this method, but it is recommended way
265 to populate "friends" of the user. Calling
266 ``await (await self.by_legacy_id(contact_id)).add_to_roster()``
267 accomplishes the same thing, but doing it in here allows to batch
268 DB queries and is better performance-wise.
270 """
271 return
272 yield
275log = logging.getLogger(__name__)