Coverage for slidge/contact/roster.py: 77%

122 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import asyncio 

2import logging 

3import warnings 

4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import IqError, IqTimeout 

8from sqlalchemy.orm import Session 

9from sqlalchemy.orm import Session as OrmSession 

10 

11from ..db.models import Contact, GatewayUser 

12from ..util import SubclassableOnce 

13from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

14from ..util.lock import NamedLockMixin 

15from ..util.types import LegacyContactType, LegacyUserIdType 

16from ..util.util import timeit 

17from .contact import LegacyContact 

18 

19if TYPE_CHECKING: 

20 from ..core.session import BaseSession 

21 

22 

23class ContactIsUser(Exception): 

24 pass 

25 

26 

27class LegacyRoster( 

28 Generic[LegacyUserIdType, LegacyContactType], 

29 NamedLockMixin, 

30 metaclass=SubclassableOnce, 

31): 

32 """ 

33 Virtual roster of a gateway user that allows to represent all 

34 of their contacts as singleton instances (if used properly and not too bugged). 

35 

36 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance 

37 accessible via the :attr:`.BaseSession.contacts` attribute. 

38 

39 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to 

40 retrieve a contact instance. 

41 

42 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or 

43 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic 

44 if you need some characters when translation JID user parts and legacy IDs. 

45 """ 

46 

47 _contact_cls: Type[LegacyContactType] 

48 

49 def __init__(self, session: "BaseSession") -> None: 

50 super().__init__() 

51 

52 self.log = logging.getLogger(f"{session.user_jid.bare}:roster") 

53 self.user_legacy_id: Optional[LegacyUserIdType] = None 

54 self.ready: asyncio.Future[bool] = session.xmpp.loop.create_future() 

55 

56 self.session = session 

57 self.__filling = False 

58 

59 @property 

60 def user(self) -> GatewayUser: 

61 return self.session.user 

62 

63 def orm(self) -> Session: 

64 return self.session.xmpp.store.session() 

65 

66 def from_store(self, stored: Contact) -> LegacyContactType: 

67 return self._contact_cls(self.session, stored=stored) 

68 

69 def __repr__(self) -> str: 

70 return f"<Roster of {self.session.user_jid}>" 

71 

72 def __iter__(self) -> Iterator[LegacyContactType]: 

73 with self.orm() as orm: 

74 for stored in orm.query(Contact).filter_by(user=self.user).all(): 

75 if stored.updated: 

76 yield self.from_store(stored) 

77 

78 def known_contacts(self, only_friends: bool = True) -> dict[str, LegacyContactType]: 

79 if only_friends: 

80 return {c.jid.bare: c for c in self if c.is_friend} 

81 return {c.jid.bare: c for c in self} 

82 

83 async def by_jid(self, contact_jid: JID) -> LegacyContactType: 

84 # """ 

85 # Retrieve a contact by their JID 

86 # 

87 # If the contact was not instantiated before, it will be created 

88 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their 

89 # legacy user ID. 

90 # 

91 # :param contact_jid: 

92 # :return: 

93 # """ 

94 username = contact_jid.node 

95 contact_jid = JID(contact_jid.bare) 

96 async with self.lock(("username", username)): 

97 legacy_id = await self.jid_username_to_legacy_id(username) 

98 if self.get_lock(("legacy_id", legacy_id)): 

99 self.log.debug("Already updating %s via by_legacy_id()", contact_jid) 

100 return await self.by_legacy_id(legacy_id) 

101 

102 with self.orm() as orm: 

103 stored = ( 

104 orm.query(Contact) 

105 .filter_by(user=self.user, jid=contact_jid) 

106 .one_or_none() 

107 ) 

108 if stored is None: 

109 stored = Contact( 

110 user_account_id=self.session.user_pk, 

111 legacy_id=legacy_id, 

112 jid=contact_jid, 

113 ) 

114 return await self.__update_if_needed(stored) 

115 

116 async def __update_if_needed(self, stored: Contact) -> LegacyContactType: 

117 contact = self.from_store(stored) 

118 if contact.stored.updated: 

119 return contact 

120 

121 with contact.updating_info(): 

122 await contact.update_info() 

123 

124 if contact.cached_presence is not None: 

125 contact._store_last_presence(contact.cached_presence) 

126 return contact 

127 

128 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None: 

129 with self.orm() as orm: 

130 stored = ( 

131 orm.query(Contact) 

132 .filter_by(user=self.user, jid=contact_jid) 

133 .one_or_none() 

134 ) 

135 if stored is not None and stored.updated: 

136 return self.from_store(stored) 

137 return None 

138 

139 @timeit 

140 async def by_legacy_id(self, legacy_id: LegacyUserIdType) -> LegacyContactType: 

141 """ 

142 Retrieve a contact by their legacy_id 

143 

144 If the contact was not instantiated before, it will be created 

145 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their 

146 legacy user ID. 

147 

148 :param legacy_id: 

149 :return: 

150 """ 

151 if legacy_id == self.user_legacy_id: 

152 raise ContactIsUser 

153 async with self.lock(("legacy_id", legacy_id)): 

154 username = await self.legacy_id_to_jid_username(legacy_id) 

155 if self.get_lock(("username", username)): 

156 self.log.debug("Already updating %s via by_jid()", username) 

157 

158 jid = JID() 

159 jid.node = username 

160 jid.domain = self.session.xmpp.boundjid.bare 

161 return await self.by_jid(jid) 

162 

163 with self.orm() as orm: 

164 stored = ( 

165 orm.query(Contact) 

166 .filter_by(user=self.user, legacy_id=str(legacy_id)) 

167 .one_or_none() 

168 ) 

169 if stored is None: 

170 stored = Contact( 

171 user_account_id=self.session.user_pk, 

172 legacy_id=str(legacy_id), 

173 jid=JID(f"{username}@{self.session.xmpp.boundjid.bare}"), 

174 ) 

175 return await self.__update_if_needed(stored) 

176 

177 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str: 

178 """ 

179 Convert a legacy ID to a valid 'user' part of a JID 

180 

181 Should be overridden for cases where the str conversion of 

182 the legacy_id is not enough, e.g., if it is case-sensitive or contains 

183 forbidden characters not covered by :xep:`0106`. 

184 

185 :param legacy_id: 

186 """ 

187 return str(legacy_id).translate(ESCAPE_TABLE) 

188 

189 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType: 

190 """ 

191 Convert a JID user part to a legacy ID. 

192 

193 Should be overridden in case legacy IDs are not strings, or more generally 

194 for any case where the username part of a JID (unescaped with to the mapping 

195 defined by :xep:`0106`) is not enough to identify a contact on the legacy network. 

196 

197 Default implementation is an identity operation 

198 

199 :param jid_username: User part of a JID, ie "user" in "user@example.com" 

200 :return: An identifier for the user on the legacy network. 

201 """ 

202 return unescape_node(jid_username) # type:ignore 

203 

204 @timeit 

205 async def _fill(self, orm: OrmSession): 

206 try: 

207 if hasattr(self.session.xmpp, "TEST_MODE"): 

208 # dirty hack to avoid mocking xmpp server replies to this 

209 # during tests 

210 raise PermissionError 

211 iq = await self.session.xmpp["xep_0356"].get_roster( 

212 self.session.user_jid.bare 

213 ) 

214 user_roster = iq["roster"]["items"] 

215 except (PermissionError, IqError, IqTimeout): 

216 user_roster = None 

217 

218 self.__filling = True 

219 async for contact in self.fill(): 

220 if user_roster is None: 

221 continue 

222 item = contact.get_roster_item() 

223 old = user_roster.get(contact.jid.bare) 

224 if old is not None and all( 

225 old[k] == item[contact.jid.bare].get(k) 

226 for k in ("subscription", "groups", "name") 

227 ): 

228 self.log.debug("No need to update roster") 

229 continue 

230 self.log.debug("Updating roster") 

231 try: 

232 await self.session.xmpp["xep_0356"].set_roster( 

233 self.session.user_jid.bare, 

234 item, 

235 ) 

236 except (PermissionError, IqError, IqTimeout) as e: 

237 warnings.warn(f"Could not add to roster: {e}") 

238 else: 

239 contact.added_to_roster = True 

240 orm.commit() 

241 self.__filling = False 

242 

243 async def fill(self) -> AsyncIterator[LegacyContact]: 

244 """ 

245 Populate slidge's "virtual roster". 

246 

247 This should yield contacts that are meant to be added to the user's 

248 roster, typically by using ``await self.by_legacy_id(contact_id)``. 

249 Setting the contact nicknames, avatar, etc. should be in 

250 :meth:`LegacyContact.update_info()` 

251 

252 It's not mandatory to override this method, but it is recommended way 

253 to populate "friends" of the user. Calling 

254 ``await (await self.by_legacy_id(contact_id)).add_to_roster()`` 

255 accomplishes the same thing, but doing it in here allows to batch 

256 DB queries and is better performance-wise. 

257 

258 """ 

259 return 

260 yield 

261 

262 

263log = logging.getLogger(__name__)