Coverage for slidge/contact/roster.py: 77%
122 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import asyncio
2import logging
3import warnings
4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type
6from slixmpp import JID
7from slixmpp.exceptions import IqError, IqTimeout
8from sqlalchemy.orm import Session
9from sqlalchemy.orm import Session as OrmSession
11from ..db.models import Contact, GatewayUser
12from ..util import SubclassableOnce
13from ..util.jid_escaping import ESCAPE_TABLE, unescape_node
14from ..util.lock import NamedLockMixin
15from ..util.types import LegacyContactType, LegacyUserIdType
16from ..util.util import timeit
17from .contact import LegacyContact
19if TYPE_CHECKING:
20 from ..core.session import BaseSession
23class ContactIsUser(Exception):
24 pass
27class LegacyRoster(
28 Generic[LegacyUserIdType, LegacyContactType],
29 NamedLockMixin,
30 metaclass=SubclassableOnce,
31):
32 """
33 Virtual roster of a gateway user that allows to represent all
34 of their contacts as singleton instances (if used properly and not too bugged).
36 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance
37 accessible via the :attr:`.BaseSession.contacts` attribute.
39 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to
40 retrieve a contact instance.
42 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or
43 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic
44 if you need some characters when translation JID user parts and legacy IDs.
45 """
47 _contact_cls: Type[LegacyContactType]
49 def __init__(self, session: "BaseSession") -> None:
50 super().__init__()
52 self.log = logging.getLogger(f"{session.user_jid.bare}:roster")
53 self.user_legacy_id: Optional[LegacyUserIdType] = None
54 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future()
56 self.session = session
57 self.__filling = False
59 @property
60 def user(self) -> GatewayUser:
61 return self.session.user
63 def orm(self) -> Session:
64 return self.session.xmpp.store.session()
66 def from_store(self, stored: Contact) -> LegacyContactType:
67 return self._contact_cls(self.session, stored=stored)
69 def __repr__(self) -> str:
70 return f"<Roster of {self.session.user_jid}>"
72 def __iter__(self) -> Iterator[LegacyContactType]:
73 with self.orm() as orm:
74 for stored in orm.query(Contact).filter_by(user=self.user).all():
75 if stored.updated:
76 yield self.from_store(stored)
78 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]:
79 if only_friends:
80 return {c.jid.bare: c for c in self if c.is_friend}
81 return {c.jid.bare: c for c in self}
83 async def by_jid(self, contact_jid: JID) -> LegacyContactType:
84 # """
85 # Retrieve a contact by their JID
86 #
87 # If the contact was not instantiated before, it will be created
88 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their
89 # legacy user ID.
90 #
91 # :param contact_jid:
92 # :return:
93 # """
94 username = contact_jid.node
95 contact_jid = JID(contact_jid.bare)
96 async with self.lock(("username", username)):
97 legacy_id = await self.jid_username_to_legacy_id(username)
98 if self.get_lock(("legacy_id", legacy_id)):
99 self.log.debug("Already updating %s via by_legacy_id()", contact_jid)
100 return await self.by_legacy_id(legacy_id)
102 with self.orm() as orm:
103 stored = (
104 orm.query(Contact)
105 .filter_by(user=self.user, jid=contact_jid)
106 .one_or_none()
107 )
108 if stored is None:
109 stored = Contact(
110 user_account_id=self.session.user_pk,
111 legacy_id=legacy_id,
112 jid=contact_jid,
113 )
114 return await self.__update_if_needed(stored)
116 async def __update_if_needed(self, stored: Contact) -> LegacyContactType:
117 contact = self.from_store(stored)
118 if contact.stored.updated:
119 return contact
121 with contact.updating_info():
122 await contact.update_info()
124 if contact.cached_presence is not None:
125 contact._store_last_presence(contact.cached_presence)
126 return contact
128 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None:
129 with self.orm() as orm:
130 stored = (
131 orm.query(Contact)
132 .filter_by(user=self.user, jid=contact_jid)
133 .one_or_none()
134 )
135 if stored is not None and stored.updated:
136 return self.from_store(stored)
137 return None
139 @timeit
140 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType:
141 """
142 Retrieve a contact by their legacy_id
144 If the contact was not instantiated before, it will be created
145 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their
146 legacy user ID.
148 :param legacy_id:
149 :return:
150 """
151 if legacy_id == self.user_legacy_id:
152 raise ContactIsUser
153 async with self.lock(("legacy_id", legacy_id)):
154 username = await self.legacy_id_to_jid_username(legacy_id)
155 if self.get_lock(("username", username)):
156 self.log.debug("Already updating %s via by_jid()", username)
158 jid = JID()
159 jid.node = username
160 jid.domain = self.session.xmpp.boundjid.bare
161 return await self.by_jid(jid)
163 with self.orm() as orm:
164 stored = (
165 orm.query(Contact)
166 .filter_by(user=self.user, legacy_id=str(legacy_id))
167 .one_or_none()
168 )
169 if stored is None:
170 stored = Contact(
171 user_account_id=self.session.user_pk,
172 legacy_id=str(legacy_id),
173 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"),
174 )
175 return await self.__update_if_needed(stored)
177 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str:
178 """
179 Convert a legacy ID to a valid 'user' part of a JID
181 Should be overridden for cases where the str conversion of
182 the legacy_id is not enough, e.g., if it is case-sensitive or contains
183 forbidden characters not covered by :xep:`0106`.
185 :param legacy_id:
186 """
187 return str(legacy_id).translate(ESCAPE_TABLE)
189 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType:
190 """
191 Convert a JID user part to a legacy ID.
193 Should be overridden in case legacy IDs are not strings, or more generally
194 for any case where the username part of a JID (unescaped with to the mapping
195 defined by :xep:`0106`) is not enough to identify a contact on the legacy network.
197 Default implementation is an identity operation
199 :param jid_username: User part of a JID, ie "user" in "user@example.com"
200 :return: An identifier for the user on the legacy network.
201 """
202 return unescape_node(jid_username) # type:ignore
204 @timeit
205 async def _fill(self, orm: OrmSession):
206 try:
207 if hasattr(self.session.xmpp, "TEST_MODE"):
208 # dirty hack to avoid mocking xmpp server replies to this
209 # during tests
210 raise PermissionError
211 iq = await self.session.xmpp["xep_0356"].get_roster(
212 self.session.user_jid.bare
213 )
214 user_roster = iq["roster"]["items"]
215 except (PermissionError, IqError, IqTimeout):
216 user_roster = None
218 self.__filling = True
219 async for contact in self.fill():
220 if user_roster is None:
221 continue
222 item = contact.get_roster_item()
223 old = user_roster.get(contact.jid.bare)
224 if old is not None and all(
225 old[k] == item[contact.jid.bare].get(k)
226 for k in ("subscription", "groups", "name")
227 ):
228 self.log.debug("No need to update roster")
229 continue
230 self.log.debug("Updating roster")
231 try:
232 await self.session.xmpp["xep_0356"].set_roster(
233 self.session.user_jid.bare,
234 item,
235 )
236 except (PermissionError, IqError, IqTimeout) as e:
237 warnings.warn(f"Could not add to roster: {e}")
238 else:
239 contact.added_to_roster = True
240 orm.commit()
241 self.__filling = False
243 async def fill(self) -> AsyncIterator[LegacyContact]:
244 """
245 Populate slidge's "virtual roster".
247 This should yield contacts that are meant to be added to the user's
248 roster, typically by using ``await self.by_legacy_id(contact_id)``.
249 Setting the contact nicknames, avatar, etc. should be in
250 :meth:`LegacyContact.update_info()`
252 It's not mandatory to override this method, but it is recommended way
253 to populate "friends" of the user. Calling
254 ``await (await self.by_legacy_id(contact_id)).add_to_roster()``
255 accomplishes the same thing, but doing it in here allows to batch
256 DB queries and is better performance-wise.
258 """
259 return
260 yield
263log = logging.getLogger(__name__)