Coverage for slidge / contact / roster.py: 76%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import asyncio 

2import logging 

3import warnings 

4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import IqError, IqTimeout, XMPPError 

8from sqlalchemy.orm import Session 

9from sqlalchemy.orm import Session as OrmSession 

10 

11from ..db.models import Contact, GatewayUser 

12from ..util import SubclassableOnce 

13from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

14from ..util.lock import NamedLockMixin 

15from ..util.types import LegacyContactType, LegacyUserIdType 

16from ..util.util import timeit 

17from .contact import LegacyContact 

18 

19if TYPE_CHECKING: 

20 from ..core.session import BaseSession 

21 

22 

23class ContactIsUser(Exception): 

24 pass 

25 

26 

27class LegacyRoster( 

28 Generic[LegacyUserIdType, LegacyContactType], 

29 NamedLockMixin, 

30 metaclass=SubclassableOnce, 

31): 

32 """ 

33 Virtual roster of a gateway user that allows to represent all 

34 of their contacts as singleton instances (if used properly and not too bugged). 

35 

36 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance 

37 accessible via the :attr:`.BaseSession.contacts` attribute. 

38 

39 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to 

40 retrieve a contact instance. 

41 

42 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or 

43 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic 

44 if you need some characters when translation JID user parts and legacy IDs. 

45 """ 

46 

47 _contact_cls: Type[LegacyContactType] 

48 

49 def __init__(self, session: "BaseSession") -> None: 

50 super().__init__() 

51 

52 self.log = logging.getLogger(f"{session.user_jid.bare}:roster") 

53 self.user_legacy_id: Optional[LegacyUserIdType] = None 

54 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future() 

55 

56 self.session = session 

57 self.__filling = False 

58 

59 @property 

60 def user(self) -> GatewayUser: 

61 return self.session.user 

62 

63 def orm(self) -> Session: 

64 return self.session.xmpp.store.session() 

65 

66 def from_store(self, stored: Contact) -> LegacyContactType: 

67 return self._contact_cls(self.session, stored=stored) 

68 

69 def __repr__(self) -> str: 

70 return f"<Roster of {self.session.user_jid}>" 

71 

72 def __iter__(self) -> Iterator[LegacyContactType]: 

73 with self.orm() as orm: 

74 for stored in orm.query(Contact).filter_by(user=self.user).all(): 

75 if stored.updated: 

76 yield self.from_store(stored) 

77 

78 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]: 

79 if only_friends: 

80 return {c.jid.bare: c for c in self if c.is_friend} 

81 return {c.jid.bare: c for c in self} 

82 

83 async def by_jid(self, contact_jid: JID) -> LegacyContactType: 

84 # """ 

85 # Retrieve a contact by their JID 

86 # 

87 # If the contact was not instantiated before, it will be created 

88 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their 

89 # legacy user ID. 

90 # 

91 # :param contact_jid: 

92 # :return: 

93 # """ 

94 username = contact_jid.node 

95 if not username: 

96 raise XMPPError( 

97 "bad-request", "Contacts must have a local part in their JID" 

98 ) 

99 contact_jid = JID(contact_jid.bare) 

100 async with self.lock(("username", username)): 

101 legacy_id = await self.jid_username_to_legacy_id(username) 

102 if legacy_id == self.user_legacy_id: 

103 raise ContactIsUser 

104 if self.get_lock(("legacy_id", legacy_id)): 

105 self.log.debug("Already updating %s via by_legacy_id()", contact_jid) 

106 return await self.by_legacy_id(legacy_id) 

107 

108 with self.orm() as orm: 

109 stored = ( 

110 orm.query(Contact) 

111 .filter_by(user=self.user, jid=contact_jid) 

112 .one_or_none() 

113 ) 

114 if stored is None: 

115 stored = Contact( 

116 user_account_id=self.session.user_pk, 

117 legacy_id=legacy_id, 

118 jid=contact_jid, 

119 ) 

120 return await self.__update_if_needed(stored) 

121 

122 async def __update_if_needed(self, stored: Contact) -> LegacyContactType: 

123 contact = self.from_store(stored) 

124 if contact.stored.updated: 

125 return contact 

126 

127 with contact.updating_info(merge=False): 

128 await contact.update_info() 

129 if contact.is_friend and not self.__filling: 

130 await contact.add_to_roster() 

131 

132 if contact.cached_presence is not None: 

133 contact._store_last_presence(contact.cached_presence) 

134 return contact 

135 

136 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None: 

137 with self.orm() as orm: 

138 stored = ( 

139 orm.query(Contact) 

140 .filter_by(user=self.user, jid=contact_jid) 

141 .one_or_none() 

142 ) 

143 if stored is not None and stored.updated: 

144 return self.from_store(stored) 

145 return None 

146 

147 @timeit 

148 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType: 

149 """ 

150 Retrieve a contact by their legacy_id 

151 

152 If the contact was not instantiated before, it will be created 

153 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their 

154 legacy user ID. 

155 

156 :param legacy_id: 

157 :return: 

158 """ 

159 if legacy_id == self.user_legacy_id: 

160 raise ContactIsUser 

161 async with self.lock(("legacy_id", legacy_id)): 

162 username = await self.legacy_id_to_jid_username(legacy_id) 

163 if self.get_lock(("username", username)): 

164 self.log.debug("Already updating %s via by_jid()", username) 

165 

166 return await self.by_jid( 

167 JID(username + "@" + self.session.xmpp.boundjid.bare) 

168 ) 

169 

170 with self.orm() as orm: 

171 stored = ( 

172 orm.query(Contact) 

173 .filter_by(user=self.user, legacy_id=str(legacy_id)) 

174 .one_or_none() 

175 ) 

176 if stored is None: 

177 stored = Contact( 

178 user_account_id=self.session.user_pk, 

179 legacy_id=str(legacy_id), 

180 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"), 

181 ) 

182 return await self.__update_if_needed(stored) 

183 

184 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str: 

185 """ 

186 Convert a legacy ID to a valid 'user' part of a JID 

187 

188 Should be overridden for cases where the str conversion of 

189 the legacy_id is not enough, e.g., if it is case-sensitive or contains 

190 forbidden characters not covered by :xep:`0106`. 

191 

192 :param legacy_id: 

193 """ 

194 return str(legacy_id).translate(ESCAPE_TABLE) 

195 

196 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType: 

197 """ 

198 Convert a JID user part to a legacy ID. 

199 

200 Should be overridden in case legacy IDs are not strings, or more generally 

201 for any case where the username part of a JID (unescaped with to the mapping 

202 defined by :xep:`0106`) is not enough to identify a contact on the legacy network. 

203 

204 Default implementation is an identity operation 

205 

206 :param jid_username: User part of a JID, ie "user" in "user@example.com" 

207 :return: An identifier for the user on the legacy network. 

208 """ 

209 return unescape_node(jid_username) # type:ignore 

210 

211 @timeit 

212 async def _fill(self, orm: OrmSession): 

213 try: 

214 if hasattr(self.session.xmpp, "TEST_MODE"): 

215 # dirty hack to avoid mocking xmpp server replies to this 

216 # during tests 

217 raise PermissionError 

218 iq = await self.session.xmpp["xep_0356"].get_roster( 

219 self.session.user_jid.bare 

220 ) 

221 user_roster = iq["roster"]["items"] 

222 except (PermissionError, IqError, IqTimeout): 

223 user_roster = None 

224 

225 self.__filling = True 

226 async for contact in self.fill(): 

227 if user_roster is None: 

228 continue 

229 item = contact.get_roster_item() 

230 old = user_roster.get(contact.jid.bare) 

231 if old is not None and all( 

232 old[k] == item[contact.jid.bare].get(k) 

233 for k in ("subscription", "groups", "name") 

234 ): 

235 self.log.debug("No need to update roster") 

236 continue 

237 self.log.debug("Updating roster") 

238 if not contact.is_friend: 

239 continue 

240 if not self.session.user.preferences.get("roster_push", True): 

241 continue 

242 try: 

243 await self.session.xmpp["xep_0356"].set_roster( 

244 self.session.user_jid.bare, 

245 item, 

246 ) 

247 except (PermissionError, IqError, IqTimeout) as e: 

248 warnings.warn(f"Could not add to roster: {e}") 

249 else: 

250 contact.added_to_roster = True 

251 contact.send_last_presence(force=True) 

252 orm.commit() 

253 self.__filling = False 

254 

255 async def fill(self) -> AsyncIterator[LegacyContact]: 

256 """ 

257 Populate slidge's "virtual roster". 

258 

259 This should yield contacts that are meant to be added to the user's 

260 roster, typically by using ``await self.by_legacy_id(contact_id)``. 

261 Setting the contact nicknames, avatar, etc. should be in 

262 :meth:`LegacyContact.update_info()` 

263 

264 It's not mandatory to override this method, but it is recommended way 

265 to populate "friends" of the user. Calling 

266 ``await (await self.by_legacy_id(contact_id)).add_to_roster()`` 

267 accomplishes the same thing, but doing it in here allows to batch 

268 DB queries and is better performance-wise. 

269 

270 """ 

271 return 

272 yield 

273 

274 

275log = logging.getLogger(__name__)