Coverage for slidge / group / bookmarks.py: 89%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import logging 

2from collections.abc import Iterator 

3from typing import TYPE_CHECKING, Generic, Literal, overload 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from ..db.models import Room 

9from ..util import SubclassableOnce 

10from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

11from ..util.lock import NamedLockMixin 

12from ..util.types import AnyMUC, AnySession, LegacyGroupIdType, LegacyMUCType 

13 

14if TYPE_CHECKING: 

15 pass 

16 

17 

18class LegacyBookmarks( 

19 Generic[LegacyGroupIdType, LegacyMUCType], 

20 NamedLockMixin, 

21 SubclassableOnce, 

22): 

23 """ 

24 This is instantiated once per :class:`~slidge.BaseSession` 

25 """ 

26 

27 _muc_cls: type[LegacyMUCType] 

28 

29 def __init__(self, session: AnySession) -> None: 

30 self.session = session 

31 self.xmpp = session.xmpp 

32 self.user_jid = session.user_jid 

33 

34 self._user_nick: str = self.session.user_jid.node 

35 

36 super().__init__() 

37 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

38 self.ready = self.session.xmpp.loop.create_future() 

39 if not self.xmpp.GROUPS: 

40 self.ready.set_result(True) 

41 

42 @property 

43 def user_nick(self) -> str: 

44 return self._user_nick 

45 

46 @user_nick.setter 

47 def user_nick(self, nick: str) -> None: 

48 self._user_nick = nick 

49 

50 def from_store(self, stored: Room) -> LegacyMUCType: 

51 return self._muc_cls(self.session, stored) 

52 

53 def __iter__(self) -> Iterator[LegacyMUCType]: 

54 with self.xmpp.store.session() as orm: 

55 for stored in orm.query(Room).filter_by(user=self.session.user).all(): 

56 if stored.updated: 

57 yield self.from_store(stored) 

58 

59 def __repr__(self) -> str: 

60 return f"<Bookmarks of {self.user_jid}>" 

61 

62 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType) -> str: 

63 return await self.legacy_id_to_jid_username(legacy_id) 

64 

65 async def jid_local_part_to_legacy_id(self, local_part: str) -> LegacyGroupIdType: 

66 return await self.jid_username_to_legacy_id(local_part) 

67 

68 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType) -> str: 

69 """ 

70 The default implementation calls ``str()`` on the legacy_id and 

71 escape characters according to :xep:`0106`. 

72 

73 You can override this class and implement a more subtle logic to raise 

74 an :class:`~slixmpp.exceptions.XMPPError` early 

75 

76 :param legacy_id: 

77 :return: 

78 """ 

79 return str(legacy_id).translate(ESCAPE_TABLE) 

80 

81 async def jid_username_to_legacy_id(self, username: str) -> LegacyGroupIdType: 

82 """ 

83 

84 :param username: 

85 :return: 

86 """ 

87 return unescape_node(username) # type:ignore[no-any-return] 

88 

89 async def by_jid(self, jid: JID) -> LegacyMUCType: 

90 if jid.resource: 

91 jid = JID(jid.bare) 

92 async with self.lock(("bare", jid.bare)): 

93 legacy_id = await self.jid_local_part_to_legacy_id(jid.node) 

94 if self.get_lock(("legacy_id", legacy_id)): 

95 self.session.log.debug("Already updating %s via by_legacy_id()", jid) 

96 return await self.by_legacy_id(legacy_id) 

97 

98 with self.session.xmpp.store.session() as orm: 

99 stored = ( 

100 orm.query(Room) 

101 .filter_by(user_account_id=self.session.user_pk, jid=jid) 

102 .one_or_none() 

103 ) 

104 if stored is None: 

105 stored = Room( 

106 user_account_id=self.session.user_pk, 

107 jid=jid, 

108 legacy_id=legacy_id, 

109 ) 

110 return await self.__update_if_needed(stored) 

111 

112 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None: 

113 with self.xmpp.store.session() as orm: 

114 stored = ( 

115 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none() 

116 ) 

117 if stored is not None and stored.updated: 

118 return self.from_store(stored) 

119 return None 

120 

121 @overload 

122 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> "LegacyMUCType": ... 

123 

124 @overload 

125 async def by_legacy_id( 

126 self, legacy_id: LegacyGroupIdType, create: Literal[False] 

127 ) -> "LegacyMUCType | None": ... 

128 

129 @overload 

130 async def by_legacy_id( 

131 self, legacy_id: LegacyGroupIdType, create: Literal[True] 

132 ) -> "LegacyMUCType": ... 

133 

134 async def by_legacy_id( 

135 self, legacy_id: LegacyGroupIdType, create: bool = False 

136 ) -> LegacyMUCType | None: 

137 async with self.lock(("legacy_id", legacy_id)): 

138 local = await self.legacy_id_to_jid_local_part(legacy_id) 

139 jid = JID(f"{local}@{self.xmpp.boundjid}") 

140 if self.get_lock(("bare", jid.bare)): 

141 self.session.log.debug("Already updating %s via by_jid()", jid) 

142 if create: 

143 return await self.by_jid(jid) 

144 else: 

145 return self.by_jid_only_if_exists(jid) 

146 

147 with self.xmpp.store.session() as orm: 

148 stored = ( 

149 orm.query(Room) 

150 .filter_by( 

151 user_account_id=self.session.user_pk, 

152 legacy_id=str(legacy_id), 

153 ) 

154 .one_or_none() 

155 ) 

156 if stored is None: 

157 stored = Room( 

158 user_account_id=self.session.user_pk, 

159 jid=jid, 

160 legacy_id=str(legacy_id), 

161 ) 

162 return await self.__update_if_needed(stored) 

163 

164 async def __update_if_needed(self, stored: Room) -> LegacyMUCType: 

165 muc = self.from_store(stored) 

166 if muc.stored.updated: 

167 return muc 

168 

169 with muc.updating_info(merge=False or muc._ALL_INFO_FILLED_ON_STARTUP): 

170 try: 

171 await muc.update_info() 

172 except NotImplementedError: 

173 pass 

174 except XMPPError: 

175 raise 

176 except Exception as e: 

177 raise XMPPError("internal-server-error", str(e)) 

178 

179 return muc 

180 

181 async def fill(self) -> None: 

182 """ 

183 Establish a user's known groups. 

184 

185 This has to be overridden in plugins with group support and at the 

186 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

187 the groups a user is part of. 

188 

189 Slidge internals will call this on successful :meth:`BaseSession.login` 

190 

191 """ 

192 if self.xmpp.GROUPS: 

193 raise NotImplementedError( 

194 "The plugin advertised support for groups but" 

195 " LegacyBookmarks.fill() was not overridden." 

196 ) 

197 

198 async def remove( 

199 self, 

200 muc: AnyMUC, 

201 reason: str = "You left this group from the official client.", 

202 kick: bool = True, 

203 ) -> None: 

204 """ 

205 Delete everything about a specific group. 

206 

207 This should be called when the user leaves the group from the official 

208 app. 

209 

210 :param muc: The MUC to remove. 

211 :param reason: Optionally, a reason why this group was removed. 

212 :param kick: Whether the user should be kicked from this group. Set this 

213 to False in case you do this somewhere else in your code, eg, on 

214 receiving the confirmation that the group was deleted. 

215 """ 

216 if kick: 

217 user_participant = await muc.get_user_participant() 

218 user_participant.kick(reason) 

219 with self.xmpp.store.session() as orm: 

220 orm.add(muc.stored) 

221 orm.refresh(muc.stored) 

222 orm.delete(muc.stored) 

223 orm.commit()