Coverage for slidge / group / bookmarks.py: 91%

171 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import abc 

2import logging 

3from collections.abc import Iterator 

4from typing import Any, Generic, Literal, overload 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import XMPPError 

8from sqlalchemy.orm import Session as OrmSession 

9 

10from slidge.contact import LegacyContact 

11 

12from ..db.models import Contact, Room, Space 

13from ..util import SubclassableOnce 

14from ..util.jid_escaping import EscapeMixin 

15from ..util.lock import NamedLockMixin 

16from ..util.types import ( 

17 AnyMUC, 

18 AnySession, 

19 LegacyMUCType, 

20 SpaceMetadata, 

21) 

22 

23 

24class LegacyBookmarks( 

25 Generic[LegacyMUCType], 

26 EscapeMixin, 

27 NamedLockMixin, 

28 SubclassableOnce, 

29): 

30 """ 

31 This is instantiated once per :class:`~slidge.BaseSession` 

32 """ 

33 

34 _muc_cls: type[LegacyMUCType] 

35 

36 def __init__(self, session: AnySession) -> None: 

37 self.session = session 

38 self.xmpp = session.xmpp 

39 self.user_jid = session.user_jid 

40 

41 self._user_nick: str = self.session.user_jid.node 

42 

43 super().__init__() 

44 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

45 self.ready = self.session.xmpp.loop.create_future() 

46 if not self.xmpp.GROUPS: 

47 self.ready.set_result(True) 

48 

49 @property 

50 def user_nick(self) -> str: 

51 return self._user_nick 

52 

53 @user_nick.setter 

54 def user_nick(self, nick: str) -> None: 

55 self._user_nick = nick 

56 

57 def orm( 

58 self, 

59 **kwargs: Any, # noqa:ANN401 

60 ) -> OrmSession: 

61 return self.session.xmpp.store.session(**kwargs) 

62 

63 def from_store(self, stored: Room) -> LegacyMUCType: 

64 return self._muc_cls(self.session, stored) 

65 

66 def __iter__(self) -> Iterator[LegacyMUCType]: 

67 with self.xmpp.store.session() as orm: 

68 for stored in orm.query(Room).filter_by(user=self.session.user).all(): 

69 if stored.updated: 

70 yield self.from_store(stored) 

71 

72 def __repr__(self) -> str: 

73 return f"<Bookmarks of {self.user_jid}>" 

74 

75 async def legacy_id_to_jid_local_part(self, legacy_id: str) -> str: 

76 return await self.legacy_id_to_jid_username(legacy_id) 

77 

78 async def jid_local_part_to_legacy_id(self, local_part: str) -> str: 

79 return await self.jid_username_to_legacy_id(local_part) 

80 

81 async def by_jid(self, jid: JID) -> LegacyMUCType: 

82 if jid.resource: 

83 jid = JID(jid.bare) 

84 async with self.lock(("bare", jid.bare)): 

85 legacy_id = await self.jid_local_part_to_legacy_id(jid.node) 

86 if self.get_lock(("legacy_id", legacy_id)): 

87 self.session.log.debug("Already updating %s via by_legacy_id()", jid) 

88 return await self.by_legacy_id(legacy_id) 

89 

90 with self.session.xmpp.store.session() as orm: 

91 stored = ( 

92 orm.query(Room) 

93 .filter_by(user_account_id=self.session.user_pk, jid=jid) 

94 .one_or_none() 

95 ) 

96 if stored is None: 

97 stored = Room( 

98 user_account_id=self.session.user_pk, 

99 jid=jid, 

100 legacy_id=legacy_id, 

101 ) 

102 return await self.__update_if_needed(stored) 

103 

104 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None: 

105 with self.xmpp.store.session(expire_on_commit=False) as orm: 

106 stored = ( 

107 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none() 

108 ) 

109 if stored is not None and stored.updated: 

110 return self.from_store(stored) 

111 return None 

112 

113 @overload 

114 async def by_legacy_id(self, legacy_id: str) -> "LegacyMUCType": ... 

115 

116 @overload 

117 async def by_legacy_id( 

118 self, legacy_id: str, create: Literal[False] 

119 ) -> "LegacyMUCType | None": ... 

120 

121 @overload 

122 async def by_legacy_id( 

123 self, legacy_id: str, create: Literal[True] 

124 ) -> "LegacyMUCType": ... 

125 

126 async def by_legacy_id( 

127 self, legacy_id: str, create: bool = False 

128 ) -> LegacyMUCType | None: 

129 async with self.lock(("legacy_id", legacy_id)): 

130 local = await self.legacy_id_to_jid_local_part(legacy_id) 

131 jid = JID(f"{local}@{self.xmpp.boundjid}") 

132 if self.get_lock(("bare", jid.bare)): 

133 self.session.log.debug("Already updating %s via by_jid()", jid) 

134 if create: 

135 return await self.by_jid(jid) 

136 else: 

137 return self.by_jid_only_if_exists(jid) 

138 

139 with self.xmpp.store.session() as orm: 

140 stored = ( 

141 orm.query(Room) 

142 .filter_by( 

143 user_account_id=self.session.user_pk, 

144 legacy_id=str(legacy_id), 

145 ) 

146 .one_or_none() 

147 ) 

148 if stored is None: 

149 stored = Room( 

150 user_account_id=self.session.user_pk, 

151 jid=jid, 

152 legacy_id=str(legacy_id), 

153 ) 

154 return await self.__update_if_needed(stored) 

155 

156 async def __update_if_needed(self, stored: Room) -> LegacyMUCType: 

157 muc = self.from_store(stored) 

158 if muc.stored.updated: 

159 return muc 

160 

161 with muc.updating_info(): 

162 try: 

163 await muc.update_info() 

164 except NotImplementedError: 

165 pass 

166 except XMPPError: 

167 raise 

168 except Exception as e: 

169 raise XMPPError("internal-server-error", str(e)) 

170 muc.archive.room = muc.stored 

171 if self.ready.done() and muc.stored.space_id: 

172 with self.orm() as orm: 

173 orm.add(muc.stored) 

174 self.xmpp.pubsub.broadcast_space(self.session, muc.stored.space) 

175 return muc 

176 

177 async def fill(self) -> None: 

178 """ 

179 Establish a user's known groups. 

180 

181 This has to be overridden in plugins with group support and at the 

182 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

183 the groups a user is part of. 

184 

185 Slidge internals will call this on successful :meth:`BaseSession.login` 

186 

187 """ 

188 if self.xmpp.GROUPS: 

189 raise NotImplementedError( 

190 "The plugin advertised support for groups but" 

191 " LegacyBookmarks.fill() was not overridden." 

192 ) 

193 

194 async def remove( 

195 self, 

196 muc: AnyMUC, 

197 reason: str = "You left this group from the official client.", 

198 kick: bool = True, 

199 ) -> None: 

200 """ 

201 Delete everything about a specific group. 

202 

203 This should be called when the user leaves the group from the official 

204 app. 

205 

206 :param muc: The MUC to remove. 

207 :param reason: Optionally, a reason why this group was removed. 

208 :param kick: Whether the user should be kicked from this group. Set this 

209 to False in case you do this somewhere else in your code, eg, on 

210 receiving the confirmation that the group was deleted. 

211 """ 

212 if kick: 

213 user_participant = await muc.get_user_participant() 

214 user_participant.kick(reason) 

215 with self.xmpp.store.session() as orm: 

216 orm.add(muc.stored) 

217 orm.refresh(muc.stored) 

218 orm.delete(muc.stored) 

219 orm.commit() 

220 

221 async def update_space_if_needed(self, space: Space) -> Space: 

222 async with self.lock(("space", space.legacy_id)): 

223 with self.orm() as orm: 

224 orm.add(space) 

225 orm.refresh(space) 

226 if space.updated: 

227 return space 

228 meta: SpaceMetadata = await self.fetch_space_metadata(space.legacy_id) 

229 return await self.__update_space_metadata(space, meta) 

230 

231 async def __update_space_metadata(self, space: Space, meta: SpaceMetadata) -> Space: 

232 creator = ( 

233 await self.__get_stored_contact(meta.creator_legacy_id) 

234 if meta.creator_legacy_id 

235 else None 

236 ) 

237 owners: list[Contact] = [] 

238 for legacy_id in set(meta.owner_legacy_ids): 

239 if legacy_id == meta.creator_legacy_id: 

240 # We don't want to fetch any contact twice here to avoid: 

241 # Can't attach instance <Contact at xxx>; another instance [...] is already present in this session. 

242 continue 

243 owner = await self.__get_stored_contact(legacy_id) 

244 if owner is not None: 

245 owners.append(owner) 

246 if ( 

247 creator is not None 

248 and meta.creator_legacy_id 

249 and meta.creator_legacy_id in meta.owner_legacy_ids 

250 ): 

251 owners.append(creator) 

252 with self.orm(expire_on_commit=False) as orm: 

253 if creator is not None: 

254 creator = orm.merge(creator) 

255 owners = [orm.merge(owner) for owner in owners] 

256 space = orm.merge(space) 

257 space.name = meta.name or space.legacy_id 

258 space.creator = creator 

259 space.owners = owners 

260 space.description = meta.description 

261 space.member_count = meta.member_count 

262 space.updated = True 

263 orm.commit() 

264 return space 

265 

266 async def __get_stored_contact(self, legacy_id_str: str) -> Contact | None: 

267 try: 

268 contact: LegacyContact = await self.session.contacts.by_legacy_id( 

269 legacy_id_str 

270 ) 

271 except Exception as e: 

272 self.log.warning("Could not get contact: %r", e) 

273 return None 

274 return contact.stored 

275 

276 @abc.abstractmethod 

277 async def fetch_space_metadata(self, legacy_id: str) -> SpaceMetadata: 

278 """ 

279 Fetch metadata associated to a space. 

280 

281 This is called once per slidge runtime. It should return metadata 

282 associated to the space identified by its ``legacy_id``. 

283 If there are updates to this metata, they should be communicated to 

284 slidge by calling :func:`LegacyBookmarks.update_space_metadata`. 

285 

286 :param legacy_id: Identifier of the space. 

287 

288 :return: Metadata associated to the space. 

289 """ 

290 raise NotImplementedError 

291 

292 async def update_spaces_info(self) -> None: 

293 with self.orm() as orm: 

294 spaces = self.session.xmpp.store.spaces.get_unupdated( 

295 orm, self.session.user_pk 

296 ) 

297 for space in spaces: 

298 await self.update_space_if_needed(space) 

299 

300 async def space_legacy_id_to_node(self, legacy_id: str) -> str: 

301 return str(legacy_id) 

302 

303 async def space_node_to_legacy_id(self, node: str) -> str: 

304 return node 

305 

306 async def update_space_metadata( 

307 self, 

308 legacy_id: str, 

309 metadata: SpaceMetadata, 

310 ) -> None: 

311 """ 

312 Updates metadata associated to a space. 

313 

314 :param legacy_id: Identifier of the space. 

315 :param name: Metadata associated to this space. 

316 """ 

317 with self.orm(expire_on_commit=False) as orm: 

318 space = self.session.xmpp.store.spaces.add_or_get( 

319 orm, 

320 self.session.user_pk, 

321 str(legacy_id), 

322 ) 

323 space = await self.__update_space_metadata(space, metadata) 

324 self.xmpp.pubsub.broadcast_space_metadata( 

325 self.session, space, await self.space_legacy_id_to_node(legacy_id) 

326 )