Coverage for slidge / group / bookmarks.py: 91%

170 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1import abc 

2import logging 

3from collections.abc import Iterator 

4from typing import Any, Generic, Literal, overload 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import XMPPError 

8from sqlalchemy.orm import Session as OrmSession 

9 

10from ..db.models import Contact, Room, Space 

11from ..util import SubclassableOnce 

12from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

13from ..util.lock import NamedLockMixin 

14from ..util.types import ( 

15 AnyContact, 

16 AnyMUC, 

17 AnySession, 

18 AnySpaceMetadata, 

19 LegacyGroupIdType, 

20 LegacyMUCType, 

21 LegacyUserIdType, 

22 SpaceMetadata, 

23) 

24 

25 

26class LegacyBookmarks( 

27 Generic[LegacyGroupIdType, LegacyMUCType, LegacyUserIdType], 

28 NamedLockMixin, 

29 SubclassableOnce, 

30): 

31 """ 

32 This is instantiated once per :class:`~slidge.BaseSession` 

33 """ 

34 

35 _muc_cls: type[LegacyMUCType] 

36 

37 def __init__(self, session: AnySession) -> None: 

38 self.session = session 

39 self.xmpp = session.xmpp 

40 self.user_jid = session.user_jid 

41 

42 self._user_nick: str = self.session.user_jid.node 

43 

44 super().__init__() 

45 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

46 self.ready = self.session.xmpp.loop.create_future() 

47 if not self.xmpp.GROUPS: 

48 self.ready.set_result(True) 

49 

50 @property 

51 def user_nick(self) -> str: 

52 return self._user_nick 

53 

54 @user_nick.setter 

55 def user_nick(self, nick: str) -> None: 

56 self._user_nick = nick 

57 

58 def orm( 

59 self, 

60 **kwargs: Any, # noqa:ANN401 

61 ) -> OrmSession: 

62 return self.session.xmpp.store.session(**kwargs) 

63 

64 def from_store(self, stored: Room) -> LegacyMUCType: 

65 return self._muc_cls(self.session, stored) 

66 

67 def __iter__(self) -> Iterator[LegacyMUCType]: 

68 with self.xmpp.store.session() as orm: 

69 for stored in orm.query(Room).filter_by(user=self.session.user).all(): 

70 if stored.updated: 

71 yield self.from_store(stored) 

72 

73 def __repr__(self) -> str: 

74 return f"<Bookmarks of {self.user_jid}>" 

75 

76 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType) -> str: 

77 return await self.legacy_id_to_jid_username(legacy_id) 

78 

79 async def jid_local_part_to_legacy_id(self, local_part: str) -> LegacyGroupIdType: 

80 return await self.jid_username_to_legacy_id(local_part) 

81 

82 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType) -> str: 

83 """ 

84 The default implementation calls ``str()`` on the legacy_id and 

85 escape characters according to :xep:`0106`. 

86 

87 You can override this class and implement a more subtle logic to raise 

88 an :class:`~slixmpp.exceptions.XMPPError` early 

89 

90 :param legacy_id: 

91 :return: 

92 """ 

93 return str(legacy_id).translate(ESCAPE_TABLE) 

94 

95 async def jid_username_to_legacy_id(self, username: str) -> LegacyGroupIdType: 

96 """ 

97 

98 :param username: 

99 :return: 

100 """ 

101 return unescape_node(username) # type:ignore[no-any-return] 

102 

103 async def by_jid(self, jid: JID) -> LegacyMUCType: 

104 if jid.resource: 

105 jid = JID(jid.bare) 

106 async with self.lock(("bare", jid.bare)): 

107 legacy_id = await self.jid_local_part_to_legacy_id(jid.node) 

108 if self.get_lock(("legacy_id", legacy_id)): 

109 self.session.log.debug("Already updating %s via by_legacy_id()", jid) 

110 return await self.by_legacy_id(legacy_id) 

111 

112 with self.session.xmpp.store.session() as orm: 

113 stored = ( 

114 orm.query(Room) 

115 .filter_by(user_account_id=self.session.user_pk, jid=jid) 

116 .one_or_none() 

117 ) 

118 if stored is None: 

119 stored = Room( 

120 user_account_id=self.session.user_pk, 

121 jid=jid, 

122 legacy_id=legacy_id, 

123 ) 

124 return await self.__update_if_needed(stored) 

125 

126 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None: 

127 with self.xmpp.store.session(expire_on_commit=False) as orm: 

128 stored = ( 

129 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none() 

130 ) 

131 if stored is not None and stored.updated: 

132 return self.from_store(stored) 

133 return None 

134 

135 @overload 

136 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> "LegacyMUCType": ... 

137 

138 @overload 

139 async def by_legacy_id( 

140 self, legacy_id: LegacyGroupIdType, create: Literal[False] 

141 ) -> "LegacyMUCType | None": ... 

142 

143 @overload 

144 async def by_legacy_id( 

145 self, legacy_id: LegacyGroupIdType, create: Literal[True] 

146 ) -> "LegacyMUCType": ... 

147 

148 async def by_legacy_id( 

149 self, legacy_id: LegacyGroupIdType, create: bool = False 

150 ) -> LegacyMUCType | None: 

151 async with self.lock(("legacy_id", legacy_id)): 

152 local = await self.legacy_id_to_jid_local_part(legacy_id) 

153 jid = JID(f"{local}@{self.xmpp.boundjid}") 

154 if self.get_lock(("bare", jid.bare)): 

155 self.session.log.debug("Already updating %s via by_jid()", jid) 

156 if create: 

157 return await self.by_jid(jid) 

158 else: 

159 return self.by_jid_only_if_exists(jid) 

160 

161 with self.xmpp.store.session() as orm: 

162 stored = ( 

163 orm.query(Room) 

164 .filter_by( 

165 user_account_id=self.session.user_pk, 

166 legacy_id=str(legacy_id), 

167 ) 

168 .one_or_none() 

169 ) 

170 if stored is None: 

171 stored = Room( 

172 user_account_id=self.session.user_pk, 

173 jid=jid, 

174 legacy_id=str(legacy_id), 

175 ) 

176 return await self.__update_if_needed(stored) 

177 

178 async def __update_if_needed(self, stored: Room) -> LegacyMUCType: 

179 muc = self.from_store(stored) 

180 if muc.stored.updated: 

181 return muc 

182 

183 with muc.updating_info(): 

184 try: 

185 await muc.update_info() 

186 except NotImplementedError: 

187 pass 

188 except XMPPError: 

189 raise 

190 except Exception as e: 

191 raise XMPPError("internal-server-error", str(e)) 

192 muc.archive.room = muc.stored 

193 if self.ready.done() and muc.stored.space_id: 

194 with self.orm() as orm: 

195 orm.add(muc.stored) 

196 self.xmpp.pubsub.broadcast_space(self.session, muc.stored.space) 

197 return muc 

198 

199 async def fill(self) -> None: 

200 """ 

201 Establish a user's known groups. 

202 

203 This has to be overridden in plugins with group support and at the 

204 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

205 the groups a user is part of. 

206 

207 Slidge internals will call this on successful :meth:`BaseSession.login` 

208 

209 """ 

210 if self.xmpp.GROUPS: 

211 raise NotImplementedError( 

212 "The plugin advertised support for groups but" 

213 " LegacyBookmarks.fill() was not overridden." 

214 ) 

215 

216 async def remove( 

217 self, 

218 muc: AnyMUC, 

219 reason: str = "You left this group from the official client.", 

220 kick: bool = True, 

221 ) -> None: 

222 """ 

223 Delete everything about a specific group. 

224 

225 This should be called when the user leaves the group from the official 

226 app. 

227 

228 :param muc: The MUC to remove. 

229 :param reason: Optionally, a reason why this group was removed. 

230 :param kick: Whether the user should be kicked from this group. Set this 

231 to False in case you do this somewhere else in your code, eg, on 

232 receiving the confirmation that the group was deleted. 

233 """ 

234 if kick: 

235 user_participant = await muc.get_user_participant() 

236 user_participant.kick(reason) 

237 with self.xmpp.store.session() as orm: 

238 orm.add(muc.stored) 

239 orm.refresh(muc.stored) 

240 orm.delete(muc.stored) 

241 orm.commit() 

242 

243 async def update_space_if_needed(self, space: Space) -> Space: 

244 if space.updated: 

245 return space 

246 meta: AnySpaceMetadata = await self.fetch_space_metadata( 

247 self.xmpp.LEGACY_ROOM_ID_TYPE(space.legacy_id) 

248 ) 

249 return await self.__update_space_metadata(space, meta) 

250 

251 async def __update_space_metadata( 

252 self, space: Space, meta: SpaceMetadata[LegacyGroupIdType] 

253 ) -> Space: 

254 creator = ( 

255 await self.__get_stored_contact(str(meta.creator_legacy_id)) 

256 if meta.creator_legacy_id 

257 else None 

258 ) 

259 owners: list[Contact] = [] 

260 for legacy_id in set(meta.owner_legacy_ids): 

261 if legacy_id == meta.creator_legacy_id: 

262 # We don't want to fetch any contact twice here to avoid: 

263 # Can't attach instance <Contact at xxx>; another instance [...] is already present in this session. 

264 continue 

265 owner = await self.__get_stored_contact(str(legacy_id)) 

266 if owner is not None: 

267 owners.append(owner) 

268 if ( 

269 creator is not None 

270 and meta.creator_legacy_id 

271 and meta.creator_legacy_id in meta.owner_legacy_ids 

272 ): 

273 owners.append(creator) 

274 with self.orm(expire_on_commit=False) as orm: 

275 if creator is not None: 

276 creator = orm.merge(creator) 

277 owners = [orm.merge(owner) for owner in owners] 

278 space = orm.merge(space) 

279 space.name = meta.name or space.legacy_id 

280 space.creator = creator 

281 space.owners = owners 

282 space.description = meta.description 

283 space.member_count = meta.member_count 

284 space.updated = True 

285 orm.commit() 

286 return space 

287 

288 async def __get_stored_contact(self, legacy_id_str: str) -> Contact | None: 

289 try: 

290 contact: AnyContact = await self.session.contacts.by_legacy_id( 

291 self.xmpp.LEGACY_CONTACT_ID_TYPE(legacy_id_str) 

292 ) 

293 except Exception as e: 

294 self.log.warning("Could not get contact: %r", e) 

295 return None 

296 return contact.stored 

297 

298 @abc.abstractmethod 

299 async def fetch_space_metadata( 

300 self, legacy_id: LegacyGroupIdType 

301 ) -> SpaceMetadata[LegacyUserIdType]: 

302 """ 

303 Fetch metadata associated to a space. 

304 

305 This is called once per slidge runtime. It should return metadata 

306 associated to the space identified by its ``legacy_id``. 

307 If there are updates to this metata, they should be communicated to 

308 slidge by calling :ref:`LegacyBookmarks.update_space_metadata`. 

309 

310 :param legacy_id: Identifier of the space. 

311 

312 :return: Metadata associated to the space. 

313 """ 

314 raise NotImplementedError 

315 

316 async def update_spaces_info(self) -> None: 

317 with self.orm() as orm: 

318 spaces = self.session.xmpp.store.spaces.get_unupdated( 

319 orm, self.session.user_pk 

320 ) 

321 for space in spaces: 

322 await self.update_space_if_needed(space) 

323 

324 async def space_legacy_id_to_node(self, legacy_id: LegacyGroupIdType) -> str: 

325 return str(legacy_id) 

326 

327 async def space_node_to_legacy_id(self, node: str) -> LegacyGroupIdType: 

328 return self.xmpp.LEGACY_ROOM_ID_TYPE(node) # type:ignore[no-any-return] 

329 

330 async def update_space_metadata( 

331 self, 

332 legacy_id: LegacyGroupIdType, 

333 metadata: SpaceMetadata[LegacyGroupIdType], 

334 ) -> None: 

335 """ 

336 Updates metadata associated to a space. 

337 

338 :param legacy_id: Identifier of the space. 

339 :param name: Metadata associated to this space. 

340 """ 

341 with self.orm(expire_on_commit=False) as orm: 

342 space = self.session.xmpp.store.spaces.add_or_get( 

343 orm, 

344 self.session.user_pk, 

345 str(legacy_id), 

346 ) 

347 space = await self.__update_space_metadata(space, metadata) 

348 self.xmpp.pubsub.broadcast_space_metadata( 

349 self.session, space, await self.space_legacy_id_to_node(legacy_id) 

350 )