Coverage for slidge / group / bookmarks.py: 89%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-13 22:59 +0000

1import logging 

2from collections.abc import Iterator 

3from typing import TYPE_CHECKING, Generic, Literal, overload 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from ..db.models import Room 

9from ..util import SubclassableOnce 

10from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

11from ..util.lock import NamedLockMixin 

12from ..util.types import LegacyGroupIdType, LegacyMUCType 

13from .room import LegacyMUC 

14 

15if TYPE_CHECKING: 

16 from slidge.core.session import BaseSession 

17 

18 

19class LegacyBookmarks( 

20 Generic[LegacyGroupIdType, LegacyMUCType], 

21 NamedLockMixin, 

22 SubclassableOnce, 

23): 

24 """ 

25 This is instantiated once per :class:`~slidge.BaseSession` 

26 """ 

27 

28 _muc_cls: type[LegacyMUCType] 

29 

30 def __init__(self, session: "BaseSession") -> None: 

31 self.session = session 

32 self.xmpp = session.xmpp 

33 self.user_jid = session.user_jid 

34 

35 self._user_nick: str = self.session.user_jid.node 

36 

37 super().__init__() 

38 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

39 self.ready = self.session.xmpp.loop.create_future() 

40 if not self.xmpp.GROUPS: 

41 self.ready.set_result(True) 

42 

43 @property 

44 def user_nick(self) -> str: 

45 return self._user_nick 

46 

47 @user_nick.setter 

48 def user_nick(self, nick: str) -> None: 

49 self._user_nick = nick 

50 

51 def from_store(self, stored: Room) -> LegacyMUCType: 

52 return self._muc_cls(self.session, stored) 

53 

54 def __iter__(self) -> Iterator[LegacyMUC]: 

55 with self.xmpp.store.session() as orm: 

56 for stored in orm.query(Room).filter_by(user=self.session.user).all(): 

57 if stored.updated: 

58 yield self.from_store(stored) 

59 

60 def __repr__(self) -> str: 

61 return f"<Bookmarks of {self.user_jid}>" 

62 

63 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType) -> str: 

64 return await self.legacy_id_to_jid_username(legacy_id) 

65 

66 async def jid_local_part_to_legacy_id(self, local_part: str) -> LegacyGroupIdType: 

67 return await self.jid_username_to_legacy_id(local_part) 

68 

69 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType) -> str: 

70 """ 

71 The default implementation calls ``str()`` on the legacy_id and 

72 escape characters according to :xep:`0106`. 

73 

74 You can override this class and implement a more subtle logic to raise 

75 an :class:`~slixmpp.exceptions.XMPPError` early 

76 

77 :param legacy_id: 

78 :return: 

79 """ 

80 return str(legacy_id).translate(ESCAPE_TABLE) 

81 

82 async def jid_username_to_legacy_id(self, username: str): 

83 """ 

84 

85 :param username: 

86 :return: 

87 """ 

88 return unescape_node(username) 

89 

90 async def by_jid(self, jid: JID) -> LegacyMUCType: 

91 if jid.resource: 

92 jid = JID(jid.bare) 

93 async with self.lock(("bare", jid.bare)): 

94 legacy_id = await self.jid_local_part_to_legacy_id(jid.node) 

95 if self.get_lock(("legacy_id", legacy_id)): 

96 self.session.log.debug("Already updating %s via by_legacy_id()", jid) 

97 return await self.by_legacy_id(legacy_id) 

98 

99 with self.session.xmpp.store.session() as orm: 

100 stored = ( 

101 orm.query(Room) 

102 .filter_by(user_account_id=self.session.user_pk, jid=jid) 

103 .one_or_none() 

104 ) 

105 if stored is None: 

106 stored = Room( 

107 user_account_id=self.session.user_pk, 

108 jid=jid, 

109 legacy_id=legacy_id, 

110 ) 

111 return await self.__update_if_needed(stored) 

112 

113 def by_jid_only_if_exists(self, jid: JID) -> LegacyMUCType | None: 

114 with self.xmpp.store.session() as orm: 

115 stored = ( 

116 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none() 

117 ) 

118 if stored is not None and stored.updated: 

119 return self.from_store(stored) 

120 return None 

121 

122 @overload 

123 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> "LegacyMUCType": ... 

124 

125 @overload 

126 async def by_legacy_id( 

127 self, legacy_id: LegacyGroupIdType, create: Literal[False] 

128 ) -> "LegacyMUCType | None": ... 

129 

130 @overload 

131 async def by_legacy_id( 

132 self, legacy_id: LegacyGroupIdType, create: Literal[True] 

133 ) -> "LegacyMUCType": ... 

134 

135 async def by_legacy_id( 

136 self, legacy_id: LegacyGroupIdType, create: bool = False 

137 ) -> LegacyMUCType | None: 

138 async with self.lock(("legacy_id", legacy_id)): 

139 local = await self.legacy_id_to_jid_local_part(legacy_id) 

140 jid = JID(f"{local}@{self.xmpp.boundjid}") 

141 if self.get_lock(("bare", jid.bare)): 

142 self.session.log.debug("Already updating %s via by_jid()", jid) 

143 if create: 

144 return await self.by_jid(jid) 

145 else: 

146 return self.by_jid_only_if_exists(jid) 

147 

148 with self.xmpp.store.session() as orm: 

149 stored = ( 

150 orm.query(Room) 

151 .filter_by( 

152 user_account_id=self.session.user_pk, 

153 legacy_id=str(legacy_id), 

154 ) 

155 .one_or_none() 

156 ) 

157 if stored is None: 

158 stored = Room( 

159 user_account_id=self.session.user_pk, 

160 jid=jid, 

161 legacy_id=str(legacy_id), 

162 ) 

163 return await self.__update_if_needed(stored) 

164 

165 async def __update_if_needed(self, stored: Room) -> LegacyMUCType: 

166 muc = self.from_store(stored) 

167 if muc.stored.updated: 

168 return muc 

169 

170 with muc.updating_info(merge=False or muc._ALL_INFO_FILLED_ON_STARTUP): 

171 try: 

172 await muc.update_info() 

173 except NotImplementedError: 

174 pass 

175 except XMPPError: 

176 raise 

177 except Exception as e: 

178 raise XMPPError("internal-server-error", str(e)) 

179 

180 return muc 

181 

182 async def fill(self) -> None: 

183 """ 

184 Establish a user's known groups. 

185 

186 This has to be overridden in plugins with group support and at the 

187 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

188 the groups a user is part of. 

189 

190 Slidge internals will call this on successful :meth:`BaseSession.login` 

191 

192 """ 

193 if self.xmpp.GROUPS: 

194 raise NotImplementedError( 

195 "The plugin advertised support for groups but" 

196 " LegacyBookmarks.fill() was not overridden." 

197 ) 

198 

199 async def remove( 

200 self, 

201 muc: LegacyMUC, 

202 reason: str = "You left this group from the official client.", 

203 kick: bool = True, 

204 ) -> None: 

205 """ 

206 Delete everything about a specific group. 

207 

208 This should be called when the user leaves the group from the official 

209 app. 

210 

211 :param muc: The MUC to remove. 

212 :param reason: Optionally, a reason why this group was removed. 

213 :param kick: Whether the user should be kicked from this group. Set this 

214 to False in case you do this somewhere else in your code, eg, on 

215 receiving the confirmation that the group was deleted. 

216 """ 

217 if kick: 

218 user_participant = await muc.get_user_participant() 

219 user_participant.kick(reason) 

220 with self.xmpp.store.session() as orm: 

221 orm.add(muc.stored) 

222 orm.refresh(muc.stored) 

223 orm.delete(muc.stored) 

224 orm.commit()