Coverage for slidge / group / bookmarks.py: 89%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1import abc 

2import logging 

3from collections.abc import Iterator 

4from typing import TYPE_CHECKING, Generic, Literal, overload 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import XMPPError 

8 

9from ..db.models import Room 

10from ..util import SubclassableOnce 

11from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

12from ..util.lock import NamedLockMixin 

13from ..util.types import LegacyGroupIdType, LegacyMUCType 

14from .room import LegacyMUC 

15 

16if TYPE_CHECKING: 

17 from slidge.core.session import BaseSession 

18 

19 

20class LegacyBookmarks( 

21 Generic[LegacyGroupIdType, LegacyMUCType], 

22 NamedLockMixin, 

23 metaclass=SubclassableOnce, 

24): 

25 """ 

26 This is instantiated once per :class:`~slidge.BaseSession` 

27 """ 

28 

29 _muc_cls: type[LegacyMUCType] 

30 

31 def __init__(self, session: "BaseSession") -> None: 

32 self.session = session 

33 self.xmpp = session.xmpp 

34 self.user_jid = session.user_jid 

35 

36 self._user_nick: str = self.session.user_jid.node 

37 

38 super().__init__() 

39 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

40 self.ready = self.session.xmpp.loop.create_future() 

41 if not self.xmpp.GROUPS: 

42 self.ready.set_result(True) 

43 

44 @property 

45 def user_nick(self): 

46 return self._user_nick 

47 

48 @user_nick.setter 

49 def user_nick(self, nick: str) -> None: 

50 self._user_nick = nick 

51 

52 def from_store(self, stored: Room) -> LegacyMUCType: 

53 return self._muc_cls(self.session, stored) 

54 

55 def __iter__(self) -> Iterator[LegacyMUC]: 

56 with self.xmpp.store.session() as orm: 

57 for stored in orm.query(Room).filter_by(user=self.session.user).all(): 

58 if stored.updated: 

59 yield self.from_store(stored) 

60 

61 def __repr__(self) -> str: 

62 return f"<Bookmarks of {self.user_jid}>" 

63 

64 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType) -> str: 

65 return await self.legacy_id_to_jid_username(legacy_id) 

66 

67 async def jid_local_part_to_legacy_id(self, local_part: str) -> LegacyGroupIdType: 

68 return await self.jid_username_to_legacy_id(local_part) 

69 

70 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType) -> str: 

71 """ 

72 The default implementation calls ``str()`` on the legacy_id and 

73 escape characters according to :xep:`0106`. 

74 

75 You can override this class and implement a more subtle logic to raise 

76 an :class:`~slixmpp.exceptions.XMPPError` early 

77 

78 :param legacy_id: 

79 :return: 

80 """ 

81 return str(legacy_id).translate(ESCAPE_TABLE) 

82 

83 async def jid_username_to_legacy_id(self, username: str): 

84 """ 

85 

86 :param username: 

87 :return: 

88 """ 

89 return unescape_node(username) 

90 

91 async def by_jid(self, jid: JID) -> LegacyMUCType: 

92 if jid.resource: 

93 jid = JID(jid.bare) 

94 async with self.lock(("bare", jid.bare)): 

95 legacy_id = await self.jid_local_part_to_legacy_id(jid.node) 

96 if self.get_lock(("legacy_id", legacy_id)): 

97 self.session.log.debug("Already updating %s via by_legacy_id()", jid) 

98 return await self.by_legacy_id(legacy_id) 

99 

100 with self.session.xmpp.store.session() as orm: 

101 stored = ( 

102 orm.query(Room) 

103 .filter_by(user_account_id=self.session.user_pk, jid=jid) 

104 .one_or_none() 

105 ) 

106 if stored is None: 

107 stored = Room( 

108 user_account_id=self.session.user_pk, 

109 jid=jid, 

110 legacy_id=legacy_id, 

111 ) 

112 return await self.__update_if_needed(stored) 

113 

114 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None: 

115 with self.xmpp.store.session() as orm: 

116 stored = ( 

117 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none() 

118 ) 

119 if stored is not None and stored.updated: 

120 return self.from_store(stored) 

121 return None 

122 

123 @overload 

124 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> "LegacyMUCType": ... 

125 

126 @overload 

127 async def by_legacy_id( 

128 self, legacy_id: LegacyGroupIdType, create: Literal[False] 

129 ) -> "LegacyMUCType | None": ... 

130 

131 @overload 

132 async def by_legacy_id( 

133 self, legacy_id: LegacyGroupIdType, create: Literal[True] 

134 ) -> "LegacyMUCType": ... 

135 

136 async def by_legacy_id( 

137 self, legacy_id: LegacyGroupIdType, create: bool = False 

138 ) -> LegacyMUCType | None: 

139 async with self.lock(("legacy_id", legacy_id)): 

140 local = await self.legacy_id_to_jid_local_part(legacy_id) 

141 jid = JID(f"{local}@{self.xmpp.boundjid}") 

142 if self.get_lock(("bare", jid.bare)): 

143 self.session.log.debug("Already updating %s via by_jid()", jid) 

144 if create: 

145 return await self.by_jid(jid) 

146 else: 

147 return self.by_jid_only_if_exists(jid) 

148 

149 with self.xmpp.store.session() as orm: 

150 stored = ( 

151 orm.query(Room) 

152 .filter_by( 

153 user_account_id=self.session.user_pk, 

154 legacy_id=str(legacy_id), 

155 ) 

156 .one_or_none() 

157 ) 

158 if stored is None: 

159 stored = Room( 

160 user_account_id=self.session.user_pk, 

161 jid=jid, 

162 legacy_id=str(legacy_id), 

163 ) 

164 return await self.__update_if_needed(stored) 

165 

166 async def __update_if_needed(self, stored: Room) -> LegacyMUCType: 

167 muc = self.from_store(stored) 

168 if muc.stored.updated: 

169 return muc 

170 

171 with muc.updating_info(merge=False or muc._ALL_INFO_FILLED_ON_STARTUP): 

172 try: 

173 await muc.update_info() 

174 except NotImplementedError: 

175 pass 

176 except XMPPError: 

177 raise 

178 except Exception as e: 

179 raise XMPPError("internal-server-error", str(e)) 

180 

181 return muc 

182 

183 @abc.abstractmethod 

184 async def fill(self): 

185 """ 

186 Establish a user's known groups. 

187 

188 This has to be overridden in plugins with group support and at the 

189 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

190 the groups a user is part of. 

191 

192 Slidge internals will call this on successful :meth:`BaseSession.login` 

193 

194 """ 

195 if self.xmpp.GROUPS: 

196 raise NotImplementedError( 

197 "The plugin advertised support for groups but" 

198 " LegacyBookmarks.fill() was not overridden." 

199 ) 

200 

201 async def remove( 

202 self, 

203 muc: LegacyMUC, 

204 reason: str = "You left this group from the official client.", 

205 kick: bool = True, 

206 ) -> None: 

207 """ 

208 Delete everything about a specific group. 

209 

210 This should be called when the user leaves the group from the official 

211 app. 

212 

213 :param muc: The MUC to remove. 

214 :param reason: Optionally, a reason why this group was removed. 

215 :param kick: Whether the user should be kicked from this group. Set this 

216 to False in case you do this somewhere else in your code, eg, on 

217 receiving the confirmation that the group was deleted. 

218 """ 

219 if kick: 

220 user_participant = await muc.get_user_participant() 

221 user_participant.kick(reason) 

222 with self.xmpp.store.session() as orm: 

223 orm.add(muc.stored) 

224 orm.refresh(muc.stored) 

225 orm.delete(muc.stored) 

226 orm.commit()