Coverage for slidge/group/bookmarks.py: 89%

109 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import abc 

2import logging 

3from typing import TYPE_CHECKING, Generic, Iterator, Literal, Optional, Type, overload 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from ..db.models import Room 

9from ..util import SubclassableOnce 

10from ..util.jid_escaping import ESCAPE_TABLE, unescape_node 

11from ..util.lock import NamedLockMixin 

12from ..util.types import LegacyGroupIdType, LegacyMUCType 

13from .room import LegacyMUC 

14 

15if TYPE_CHECKING: 

16 from slidge.core.session import BaseSession 

17 

18 

19class LegacyBookmarks( 

20 Generic[LegacyGroupIdType, LegacyMUCType], 

21 NamedLockMixin, 

22 metaclass=SubclassableOnce, 

23): 

24 """ 

25 This is instantiated once per :class:`~slidge.BaseSession` 

26 """ 

27 

28 _muc_cls: Type[LegacyMUCType] 

29 

30 def __init__(self, session: "BaseSession") -> None: 

31 self.session = session 

32 self.xmpp = session.xmpp 

33 self.user_jid = session.user_jid 

34 

35 self._user_nick: str = self.session.user_jid.node 

36 

37 super().__init__() 

38 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

39 self.ready = self.session.xmpp.loop.create_future() 

40 if not self.xmpp.GROUPS: 

41 self.ready.set_result(True) 

42 

43 @property 

44 def user_nick(self): 

45 return self._user_nick 

46 

47 @user_nick.setter 

48 def user_nick(self, nick: str) -> None: 

49 self._user_nick = nick 

50 

51 def from_store(self, stored: Room) -> LegacyMUCType: 

52 return self._muc_cls(self.session, stored) 

53 

54 def __iter__(self) -> Iterator[LegacyMUC]: 

55 with self.xmpp.store.session() as orm: 

56 for stored in orm.query(Room).filter_by(user=self.session.user).all(): 

57 if stored.updated: 

58 yield self.from_store(stored) 

59 

60 def __repr__(self) -> str: 

61 return f"<Bookmarks of {self.user_jid}>" 

62 

63 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType) -> str: 

64 return await self.legacy_id_to_jid_username(legacy_id) 

65 

66 async def jid_local_part_to_legacy_id(self, local_part: str) -> LegacyGroupIdType: 

67 return await self.jid_username_to_legacy_id(local_part) 

68 

69 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType) -> str: 

70 """ 

71 The default implementation calls ``str()`` on the legacy_id and 

72 escape characters according to :xep:`0106`. 

73 

74 You can override this class and implement a more subtle logic to raise 

75 an :class:`~slixmpp.exceptions.XMPPError` early 

76 

77 :param legacy_id: 

78 :return: 

79 """ 

80 return str(legacy_id).translate(ESCAPE_TABLE) 

81 

82 async def jid_username_to_legacy_id(self, username: str): 

83 """ 

84 

85 :param username: 

86 :return: 

87 """ 

88 return unescape_node(username) 

89 

90 async def by_jid(self, jid: JID) -> LegacyMUCType: 

91 if jid.resource: 

92 jid = JID(jid.bare) 

93 async with self.lock(("bare", jid.bare)): 

94 legacy_id = await self.jid_local_part_to_legacy_id(jid.node) 

95 if self.get_lock(("legacy_id", legacy_id)): 

96 self.session.log.debug("Already updating %s via by_legacy_id()", jid) 

97 return await self.by_legacy_id(legacy_id) 

98 

99 with self.session.xmpp.store.session() as orm: 

100 stored = ( 

101 orm.query(Room) 

102 .filter_by(user_account_id=self.session.user_pk, jid=jid) 

103 .one_or_none() 

104 ) 

105 if stored is None: 

106 stored = Room( 

107 user_account_id=self.session.user_pk, 

108 jid=jid, 

109 legacy_id=legacy_id, 

110 ) 

111 return await self.__update_if_needed(stored) 

112 

113 def by_jid_only_if_exists(self, jid: JID) -> Optional[LegacyMUCType]: 

114 with self.xmpp.store.session() as orm: 

115 stored = ( 

116 orm.query(Room).filter_by(user=self.session.user, jid=jid).one_or_none() 

117 ) 

118 if stored is not None and stored.updated: 

119 return self.from_store(stored) 

120 return None 

121 

122 @overload 

123 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> "LegacyMUCType": ... 

124 

125 @overload 

126 async def by_legacy_id( 

127 self, legacy_id: LegacyGroupIdType, create: Literal[False] 

128 ) -> "LegacyMUCType | None": ... 

129 

130 @overload 

131 async def by_legacy_id( 

132 self, legacy_id: LegacyGroupIdType, create: Literal[True] 

133 ) -> "LegacyMUCType": ... 

134 

135 async def by_legacy_id( 

136 self, legacy_id: LegacyGroupIdType, create: bool = False 

137 ) -> LegacyMUCType | None: 

138 async with self.lock(("legacy_id", legacy_id)): 

139 local = await self.legacy_id_to_jid_local_part(legacy_id) 

140 jid = JID(f"{local}@{self.xmpp.boundjid}") 

141 if self.get_lock(("bare", jid.bare)): 

142 self.session.log.debug("Already updating %s via by_jid()", jid) 

143 if create: 

144 return await self.by_jid(jid) 

145 else: 

146 return self.by_jid_only_if_exists(jid) 

147 

148 with self.xmpp.store.session() as orm: 

149 stored = ( 

150 orm.query(Room) 

151 .filter_by( 

152 user_account_id=self.session.user_pk, 

153 legacy_id=str(legacy_id), 

154 ) 

155 .one_or_none() 

156 ) 

157 if stored is None: 

158 stored = Room( 

159 user_account_id=self.session.user_pk, 

160 jid=jid, 

161 legacy_id=str(legacy_id), 

162 ) 

163 return await self.__update_if_needed(stored) 

164 

165 async def __update_if_needed(self, stored: Room) -> LegacyMUCType: 

166 muc = self.from_store(stored) 

167 if muc.stored.updated: 

168 return muc 

169 

170 with muc.updating_info(): 

171 try: 

172 await muc.update_info() 

173 except NotImplementedError: 

174 pass 

175 except XMPPError: 

176 raise 

177 except Exception as e: 

178 raise XMPPError("internal-server-error", str(e)) 

179 

180 return muc 

181 

182 @abc.abstractmethod 

183 async def fill(self): 

184 """ 

185 Establish a user's known groups. 

186 

187 This has to be overridden in plugins with group support and at the 

188 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

189 the groups a user is part of. 

190 

191 Slidge internals will call this on successful :meth:`BaseSession.login` 

192 

193 """ 

194 if self.xmpp.GROUPS: 

195 raise NotImplementedError( 

196 "The plugin advertised support for groups but" 

197 " LegacyBookmarks.fill() was not overridden." 

198 ) 

199 

200 async def remove( 

201 self, 

202 muc: LegacyMUC, 

203 reason: str = "You left this group from the official client.", 

204 kick: bool = True, 

205 ) -> None: 

206 """ 

207 Delete everything about a specific group. 

208 

209 This should be called when the user leaves the group from the official 

210 app. 

211 

212 :param muc: The MUC to remove. 

213 :param reason: Optionally, a reason why this group was removed. 

214 :param kick: Whether the user should be kicked from this group. Set this 

215 to False in case you do this somewhere else in your code, eg, on 

216 receiving the confirmation that the group was deleted. 

217 """ 

218 if kick: 

219 user_participant = await muc.get_user_participant() 

220 user_participant.kick(reason) 

221 with self.xmpp.store.session() as orm: 

222 orm.delete(muc.stored) 

223 orm.commit()