Coverage for slidge/contact/roster.py: 75%

119 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import asyncio 

2import logging 

3import warnings 

4from typing import TYPE_CHECKING, AsyncIterator, Generic, Iterator, Optional, Type 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import IqError, IqTimeout, XMPPError 

8from slixmpp.jid import JID_UNESCAPE_TRANSFORMATIONS, _unescape_node 

9 

10from ..core.mixins.lock import NamedLockMixin 

11from ..db.models import Contact 

12from ..db.store import ContactStore 

13from ..util import SubclassableOnce 

14from ..util.types import LegacyContactType, LegacyUserIdType 

15from .contact import LegacyContact 

16 

17if TYPE_CHECKING: 

18 from ..core.session import BaseSession 

19 

20 

21class ContactIsUser(Exception): 

22 pass 

23 

24 

25class LegacyRoster( 

26 Generic[LegacyUserIdType, LegacyContactType], 

27 NamedLockMixin, 

28 metaclass=SubclassableOnce, 

29): 

30 """ 

31 Virtual roster of a gateway user, that allows to represent all 

32 of their contacts as singleton instances (if used properly and not too bugged). 

33 

34 Every :class:`.BaseSession` instance will have its own :class:`.LegacyRoster` instance 

35 accessible via the :attr:`.BaseSession.contacts` attribute. 

36 

37 Typically, you will mostly use the :meth:`.LegacyRoster.by_legacy_id` function to 

38 retrieve a contact instance. 

39 

40 You might need to override :meth:`.LegacyRoster.legacy_id_to_jid_username` and/or 

41 :meth:`.LegacyRoster.jid_username_to_legacy_id` to incorporate some custom logic 

42 if you need some characters when translation JID user parts and legacy IDs. 

43 """ 

44 

45 def __init__(self, session: "BaseSession"): 

46 self._contact_cls: Type[LegacyContactType] = ( 

47 LegacyContact.get_self_or_unique_subclass() 

48 ) 

49 self._contact_cls.xmpp = session.xmpp 

50 self.__store: ContactStore = session.xmpp.store.contacts 

51 

52 self.session = session 

53 self.log = logging.getLogger(f"{self.session.user_jid.bare}:roster") 

54 self.user_legacy_id: Optional[LegacyUserIdType] = None 

55 self.ready: asyncio.Future[bool] = self.session.xmpp.loop.create_future() 

56 self.__filling = False 

57 super().__init__() 

58 

59 def __repr__(self): 

60 return f"<Roster of {self.session.user_jid}>" 

61 

62 def __iter__(self) -> Iterator[LegacyContactType]: 

63 with self.__store.session(): 

64 for stored in self.__store.get_all(user_pk=self.session.user_pk): 

65 yield self._contact_cls.from_store(self.session, stored) 

66 

67 def known_contacts(self, only_friends=True) -> dict[str, LegacyContactType]: 

68 if only_friends: 

69 return {c.jid.bare: c for c in self if c.is_friend} 

70 return {c.jid.bare: c for c in self} 

71 

72 async def by_jid(self, contact_jid: JID) -> LegacyContactType: 

73 # """ 

74 # Retrieve a contact by their JID 

75 # 

76 # If the contact was not instantiated before, it will be created 

77 # using :meth:`slidge.LegacyRoster.jid_username_to_legacy_id` to infer their 

78 # legacy user ID. 

79 # 

80 # :param contact_jid: 

81 # :return: 

82 # """ 

83 username = contact_jid.node 

84 async with self.lock(("username", username)): 

85 legacy_id = await self.jid_username_to_legacy_id(username) 

86 log.debug("Contact %s not found", contact_jid) 

87 if self.get_lock(("legacy_id", legacy_id)): 

88 log.debug("Already updating %s", contact_jid) 

89 return await self.by_legacy_id(legacy_id) 

90 

91 with self.__store.session(): 

92 stored = self.__store.get_by_jid(self.session.user_pk, contact_jid) 

93 return await self.__update_contact(stored, legacy_id, username) 

94 

95 def by_jid_only_if_exists(self, contact_jid: JID) -> LegacyContactType | None: 

96 with self.__store.session(): 

97 stored = self.__store.get_by_jid(self.session.user_pk, contact_jid) 

98 if stored is not None and stored.updated: 

99 return self._contact_cls.from_store(self.session, stored) 

100 return None 

101 

102 async def by_legacy_id( 

103 self, legacy_id: LegacyUserIdType, *args, **kwargs 

104 ) -> LegacyContactType: 

105 """ 

106 Retrieve a contact by their legacy_id 

107 

108 If the contact was not instantiated before, it will be created 

109 using :meth:`slidge.LegacyRoster.legacy_id_to_jid_username` to infer their 

110 legacy user ID. 

111 

112 :param legacy_id: 

113 :param args: arbitrary additional positional arguments passed to the contact constructor. 

114 Requires subclassing LegacyContact.__init__ to accept those. 

115 This is useful for networks where you fetch the contact list and information 

116 about these contacts in a single request 

117 :param kwargs: arbitrary keyword arguments passed to the contact constructor 

118 :return: 

119 """ 

120 if legacy_id == self.user_legacy_id: 

121 raise ContactIsUser 

122 async with self.lock(("legacy_id", legacy_id)): 

123 username = await self.legacy_id_to_jid_username(legacy_id) 

124 if self.get_lock(("username", username)): 

125 log.debug("Already updating %s", username) 

126 jid = JID() 

127 jid.node = username 

128 jid.domain = self.session.xmpp.boundjid.bare 

129 return await self.by_jid(jid) 

130 

131 with self.__store.session(): 

132 stored = self.__store.get_by_legacy_id( 

133 self.session.user_pk, str(legacy_id) 

134 ) 

135 return await self.__update_contact( 

136 stored, legacy_id, username, *args, **kwargs 

137 ) 

138 

139 async def __update_contact( 

140 self, 

141 stored: Contact | None, 

142 legacy_id: LegacyUserIdType, 

143 username: str, 

144 *a, 

145 **kw, 

146 ) -> LegacyContactType: 

147 if stored is None: 

148 contact = self._contact_cls(self.session, legacy_id, username, *a, **kw) 

149 else: 

150 contact = self._contact_cls.from_store(self.session, stored, *a, **kw) 

151 if stored.updated: 

152 return contact 

153 

154 try: 

155 with contact.updating_info(): 

156 await contact.avatar_wrap_update_info() 

157 except XMPPError: 

158 raise 

159 except Exception as e: 

160 raise XMPPError("internal-server-error", str(e)) 

161 contact._caps_ver = await contact.get_caps_ver(contact.jid) 

162 contact.contact_pk = self.__store.update(contact, commit=not self.__filling) 

163 return contact 

164 

165 async def by_stanza(self, s) -> LegacyContact: 

166 # """ 

167 # Retrieve a contact by the destination of a stanza 

168 # 

169 # See :meth:`slidge.Roster.by_legacy_id` for more info. 

170 # 

171 # :param s: 

172 # :return: 

173 # """ 

174 return await self.by_jid(s.get_to()) 

175 

176 async def legacy_id_to_jid_username(self, legacy_id: LegacyUserIdType) -> str: 

177 """ 

178 Convert a legacy ID to a valid 'user' part of a JID 

179 

180 Should be overridden for cases where the str conversion of 

181 the legacy_id is not enough, e.g., if it is case-sensitive or contains 

182 forbidden characters not covered by :xep:`0106`. 

183 

184 :param legacy_id: 

185 """ 

186 return str(legacy_id).translate(ESCAPE_TABLE) 

187 

188 async def jid_username_to_legacy_id(self, jid_username: str) -> LegacyUserIdType: 

189 """ 

190 Convert a JID user part to a legacy ID. 

191 

192 Should be overridden in case legacy IDs are not strings, or more generally 

193 for any case where the username part of a JID (unescaped with to the mapping 

194 defined by :xep:`0106`) is not enough to identify a contact on the legacy network. 

195 

196 Default implementation is an identity operation 

197 

198 :param jid_username: User part of a JID, ie "user" in "user@example.com" 

199 :return: An identifier for the user on the legacy network. 

200 """ 

201 return _unescape_node(jid_username) 

202 

203 async def _fill(self): 

204 try: 

205 if hasattr(self.session.xmpp, "TEST_MODE"): 

206 # dirty hack to avoid mocking xmpp server replies to this 

207 # during tests 

208 raise PermissionError 

209 iq = await self.session.xmpp["xep_0356"].get_roster( 

210 self.session.user_jid.bare 

211 ) 

212 user_roster = iq["roster"]["items"] 

213 except (PermissionError, IqError, IqTimeout): 

214 user_roster = None 

215 

216 with self.__store.session() as orm: 

217 self.__filling = True 

218 async for contact in self.fill(): 

219 if user_roster is None: 

220 continue 

221 item = contact.get_roster_item() 

222 old = user_roster.get(contact.jid.bare) 

223 if old is not None and all( 

224 old[k] == item[contact.jid.bare].get(k) 

225 for k in ("subscription", "groups", "name") 

226 ): 

227 self.log.debug("No need to update roster") 

228 continue 

229 self.log.debug("Updating roster") 

230 try: 

231 await self.session.xmpp["xep_0356"].set_roster( 

232 self.session.user_jid.bare, 

233 item, 

234 ) 

235 except (PermissionError, IqError, IqTimeout) as e: 

236 warnings.warn(f"Could not add to roster: {e}") 

237 else: 

238 contact._added_to_roster = True 

239 orm.commit() 

240 self.__filling = False 

241 

242 async def fill(self) -> AsyncIterator[LegacyContact]: 

243 """ 

244 Populate slidge's "virtual roster". 

245 

246 This should yield contacts that are meant to be added to the user's 

247 roster, typically by using ``await self.by_legacy_id(contact_id)``. 

248 Setting the contact nicknames, avatar, etc. should be in 

249 :meth:`LegacyContact.update_info()` 

250 

251 It's not mandatory to override this method, but it is recommended way 

252 to populate "friends" of the user. Calling 

253 ``await (await self.by_legacy_id(contact_id)).add_to_roster()`` 

254 accomplishes the same thing, but doing it in here allows to batch 

255 DB queries and is better performance-wise. 

256 

257 """ 

258 return 

259 yield 

260 

261 

262ESCAPE_TABLE = "".maketrans({v: k for k, v in JID_UNESCAPE_TRANSFORMATIONS.items()}) 

263log = logging.getLogger(__name__)