Coverage for slidge/contact/roster.py: 76%

127 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import asyncio 

2import logging 

3import warnings 

4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import IqError, IqTimeout, XMPPError 

8from sqlalchemy.orm import Session 

9from sqlalchemy.orm import Session as OrmSession 

10 

11from ..db.models import Contact, GatewayUser 

12from ..util import SubclassableOnce 

13from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

14from ..util.lock import NamedLockMixin 

15from ..util.types import LegacyContactType, LegacyUserIdType 

16from ..util.util import timeit 

17from .contact import LegacyContact 

18 

19if TYPE_CHECKING: 

20 from ..core.session import BaseSession 

21 

22 

23class ContactIsUser(Exception): 

24 pass 

25 

26 

27class LegacyRoster( 

28 Generic[LegacyUserIdType, LegacyContactType], 

29 NamedLockMixin, 

30 metaclass=SubclassableOnce, 

31): 

32 """ 

33 Virtual roster of a gateway user that allows to represent all 

34 of their contacts as singleton instances (if used properly and not too bugged). 

35 

36 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance 

37 accessible via the :attr:`.BaseSession.contacts` attribute. 

38 

39 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to 

40 retrieve a contact instance. 

41 

42 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or 

43 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic 

44 if you need some characters when translation JID user parts and legacy IDs. 

45 """ 

46 

47 _contact_cls: Type[LegacyContactType] 

48 

49 def __init__(self, session: "BaseSession") -> None: 

50 super().__init__() 

51 

52 self.log = logging.getLogger(f"{session.user_jid.bare}:roster") 

53 self.user_legacy_id: Optional[LegacyUserIdType] = None 

54 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future() 

55 

56 self.session = session 

57 self.__filling = False 

58 

59 @property 

60 def user(self) -> GatewayUser: 

61 return self.session.user 

62 

63 def orm(self) -> Session: 

64 return self.session.xmpp.store.session() 

65 

66 def from_store(self, stored: Contact) -> LegacyContactType: 

67 return self._contact_cls(self.session, stored=stored) 

68 

69 def __repr__(self) -> str: 

70 return f"<Roster of {self.session.user_jid}>" 

71 

72 def __iter__(self) -> Iterator[LegacyContactType]: 

73 with self.orm() as orm: 

74 for stored in orm.query(Contact).filter_by(user=self.user).all(): 

75 if stored.updated: 

76 yield self.from_store(stored) 

77 

78 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]: 

79 if only_friends: 

80 return {c.jid.bare: c for c in self if c.is_friend} 

81 return {c.jid.bare: c for c in self} 

82 

83 async def by_jid(self, contact_jid: JID) -> LegacyContactType: 

84 # """ 

85 # Retrieve a contact by their JID 

86 # 

87 # If the contact was not instantiated before, it will be created 

88 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their 

89 # legacy user ID. 

90 # 

91 # :param contact_jid: 

92 # :return: 

93 # """ 

94 username = contact_jid.node 

95 if not username: 

96 raise XMPPError( 

97 "bad-request", "Contacts must have a local part in their JID" 

98 ) 

99 contact_jid = JID(contact_jid.bare) 

100 async with self.lock(("username", username)): 

101 legacy_id = await self.jid_username_to_legacy_id(username) 

102 if legacy_id == self.user_legacy_id: 

103 raise ContactIsUser 

104 if self.get_lock(("legacy_id", legacy_id)): 

105 self.log.debug("Already updating %s via by_legacy_id()", contact_jid) 

106 return await self.by_legacy_id(legacy_id) 

107 

108 with self.orm() as orm: 

109 stored = ( 

110 orm.query(Contact) 

111 .filter_by(user=self.user, jid=contact_jid) 

112 .one_or_none() 

113 ) 

114 if stored is None: 

115 stored = Contact( 

116 user_account_id=self.session.user_pk, 

117 legacy_id=legacy_id, 

118 jid=contact_jid, 

119 ) 

120 return await self.__update_if_needed(stored) 

121 

122 async def __update_if_needed(self, stored: Contact) -> LegacyContactType: 

123 contact = self.from_store(stored) 

124 if contact.stored.updated: 

125 return contact 

126 

127 with contact.updating_info(): 

128 await contact.update_info() 

129 

130 if contact.cached_presence is not None: 

131 contact._store_last_presence(contact.cached_presence) 

132 return contact 

133 

134 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None: 

135 with self.orm() as orm: 

136 stored = ( 

137 orm.query(Contact) 

138 .filter_by(user=self.user, jid=contact_jid) 

139 .one_or_none() 

140 ) 

141 if stored is not None and stored.updated: 

142 return self.from_store(stored) 

143 return None 

144 

145 @timeit 

146 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType: 

147 """ 

148 Retrieve a contact by their legacy_id 

149 

150 If the contact was not instantiated before, it will be created 

151 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their 

152 legacy user ID. 

153 

154 :param legacy_id: 

155 :return: 

156 """ 

157 if legacy_id == self.user_legacy_id: 

158 raise ContactIsUser 

159 async with self.lock(("legacy_id", legacy_id)): 

160 username = await self.legacy_id_to_jid_username(legacy_id) 

161 if self.get_lock(("username", username)): 

162 self.log.debug("Already updating %s via by_jid()", username) 

163 

164 jid = JID() 

165 jid.node = username 

166 jid.domain = self.session.xmpp.boundjid.bare 

167 return await self.by_jid(jid) 

168 

169 with self.orm() as orm: 

170 stored = ( 

171 orm.query(Contact) 

172 .filter_by(user=self.user, legacy_id=str(legacy_id)) 

173 .one_or_none() 

174 ) 

175 if stored is None: 

176 stored = Contact( 

177 user_account_id=self.session.user_pk, 

178 legacy_id=str(legacy_id), 

179 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"), 

180 ) 

181 return await self.__update_if_needed(stored) 

182 

183 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str: 

184 """ 

185 Convert a legacy ID to a valid 'user' part of a JID 

186 

187 Should be overridden for cases where the str conversion of 

188 the legacy_id is not enough, e.g., if it is case-sensitive or contains 

189 forbidden characters not covered by :xep:`0106`. 

190 

191 :param legacy_id: 

192 """ 

193 return str(legacy_id).translate(ESCAPE_TABLE) 

194 

195 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType: 

196 """ 

197 Convert a JID user part to a legacy ID. 

198 

199 Should be overridden in case legacy IDs are not strings, or more generally 

200 for any case where the username part of a JID (unescaped with to the mapping 

201 defined by :xep:`0106`) is not enough to identify a contact on the legacy network. 

202 

203 Default implementation is an identity operation 

204 

205 :param jid_username: User part of a JID, ie "user" in "user@example.com" 

206 :return: An identifier for the user on the legacy network. 

207 """ 

208 return unescape_node(jid_username) # type:ignore 

209 

210 @timeit 

211 async def _fill(self, orm: OrmSession): 

212 try: 

213 if hasattr(self.session.xmpp, "TEST_MODE"): 

214 # dirty hack to avoid mocking xmpp server replies to this 

215 # during tests 

216 raise PermissionError 

217 iq = await self.session.xmpp["xep_0356"].get_roster( 

218 self.session.user_jid.bare 

219 ) 

220 user_roster = iq["roster"]["items"] 

221 except (PermissionError, IqError, IqTimeout): 

222 user_roster = None 

223 

224 self.__filling = True 

225 async for contact in self.fill(): 

226 if user_roster is None: 

227 continue 

228 item = contact.get_roster_item() 

229 old = user_roster.get(contact.jid.bare) 

230 if old is not None and all( 

231 old[k] == item[contact.jid.bare].get(k) 

232 for k in ("subscription", "groups", "name") 

233 ): 

234 self.log.debug("No need to update roster") 

235 continue 

236 self.log.debug("Updating roster") 

237 try: 

238 await self.session.xmpp["xep_0356"].set_roster( 

239 self.session.user_jid.bare, 

240 item, 

241 ) 

242 except (PermissionError, IqError, IqTimeout) as e: 

243 warnings.warn(f"Could not add to roster: {e}") 

244 else: 

245 contact.added_to_roster = True 

246 contact.send_last_presence(force=True) 

247 orm.commit() 

248 self.__filling = False 

249 

250 async def fill(self) -> AsyncIterator[LegacyContact]: 

251 """ 

252 Populate slidge's "virtual roster". 

253 

254 This should yield contacts that are meant to be added to the user's 

255 roster, typically by using ``await self.by_legacy_id(contact_id)``. 

256 Setting the contact nicknames, avatar, etc. should be in 

257 :meth:`LegacyContact.update_info()` 

258 

259 It's not mandatory to override this method, but it is recommended way 

260 to populate "friends" of the user. Calling 

261 ``await (await self.by_legacy_id(contact_id)).add_to_roster()`` 

262 accomplishes the same thing, but doing it in here allows to batch 

263 DB queries and is better performance-wise. 

264 

265 """ 

266 return 

267 yield 

268 

269 

270log = logging.getLogger(__name__)