Coverage for slidge/group/bookmarks.py: 89%

101 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import abc 

2import logging 

3from typing import TYPE_CHECKING, Generic, Iterator, Optional, Type 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7from slixmpp.jid import _unescape_node 

8 

9from ..contact.roster import ESCAPE_TABLE 

10from ..core.mixins.lock import NamedLockMixin 

11from ..db.models import Room 

12from ..util import SubclassableOnce 

13from ..util.types import LegacyGroupIdType, LegacyMUCType 

14from .archive import MessageArchive 

15from .room import LegacyMUC 

16 

17if TYPE_CHECKING: 

18 from slidge.core.session import BaseSession 

19 

20 

21class LegacyBookmarks( 

22 Generic[LegacyGroupIdType, LegacyMUCType], 

23 NamedLockMixin, 

24 metaclass=SubclassableOnce, 

25): 

26 """ 

27 This is instantiated once per :class:`~slidge.BaseSession` 

28 """ 

29 

30 def __init__(self, session: "BaseSession"): 

31 self.session = session 

32 self.xmpp = session.xmpp 

33 self.user_jid = session.user_jid 

34 self.__store = self.xmpp.store.rooms 

35 

36 self._muc_class: Type[LegacyMUCType] = LegacyMUC.get_self_or_unique_subclass() 

37 

38 self._user_nick: str = self.session.user_jid.node 

39 

40 super().__init__() 

41 self.log = logging.getLogger(f"{self.user_jid.bare}:bookmarks") 

42 self.ready = self.session.xmpp.loop.create_future() 

43 if not self.xmpp.GROUPS: 

44 self.ready.set_result(True) 

45 

46 @property 

47 def user_nick(self): 

48 return self._user_nick 

49 

50 @user_nick.setter 

51 def user_nick(self, nick: str): 

52 self._user_nick = nick 

53 

54 def __iter__(self) -> Iterator[LegacyMUCType]: 

55 for stored in self.__store.get_all(user_pk=self.session.user_pk): 

56 yield self._muc_class.from_store(self.session, stored) 

57 

58 def __repr__(self): 

59 return f"<Bookmarks of {self.user_jid}>" 

60 

61 async def legacy_id_to_jid_local_part(self, legacy_id: LegacyGroupIdType): 

62 return await self.legacy_id_to_jid_username(legacy_id) 

63 

64 async def jid_local_part_to_legacy_id(self, local_part: str): 

65 return await self.jid_username_to_legacy_id(local_part) 

66 

67 async def legacy_id_to_jid_username(self, legacy_id: LegacyGroupIdType): 

68 """ 

69 The default implementation calls ``str()`` on the legacy_id and 

70 escape characters according to :xep:`0106`. 

71 

72 You can override this class and implement a more subtle logic to raise 

73 an :class:`~slixmpp.exceptions.XMPPError` early 

74 

75 :param legacy_id: 

76 :return: 

77 """ 

78 return str(legacy_id).translate(ESCAPE_TABLE) 

79 

80 async def jid_username_to_legacy_id(self, username: str): 

81 """ 

82 

83 :param username: 

84 :return: 

85 """ 

86 return _unescape_node(username) 

87 

88 async def by_jid(self, jid: JID) -> LegacyMUCType: 

89 if jid.resource: 

90 jid = JID(jid.bare) 

91 async with self.lock(("bare", jid.bare)): 

92 assert isinstance(jid.local, str) 

93 legacy_id = await self.jid_local_part_to_legacy_id(jid.local) 

94 if self.get_lock(("legacy_id", legacy_id)): 

95 self.log.debug("Not instantiating %s after all", jid) 

96 return await self.by_legacy_id(legacy_id) 

97 

98 with self.__store.session(): 

99 stored = self.__store.get_by_jid(self.session.user_pk, jid) 

100 return await self.__update_muc(stored, legacy_id, jid) 

101 

102 def by_jid_only_if_exists(self, jid: JID) -> Optional[LegacyMUCType]: 

103 with self.__store.session(): 

104 stored = self.__store.get_by_jid(self.session.user_pk, jid) 

105 if stored is not None and stored.updated: 

106 return self._muc_class.from_store(self.session, stored) 

107 return None 

108 

109 async def by_legacy_id(self, legacy_id: LegacyGroupIdType) -> LegacyMUCType: 

110 async with self.lock(("legacy_id", legacy_id)): 

111 local = await self.legacy_id_to_jid_local_part(legacy_id) 

112 jid = JID(f"{local}@{self.xmpp.boundjid}") 

113 if self.get_lock(("bare", jid.bare)): 

114 self.log.debug("Not instantiating %s after all", legacy_id) 

115 return await self.by_jid(jid) 

116 

117 with self.__store.session(): 

118 stored = self.__store.get_by_legacy_id( 

119 self.session.user_pk, str(legacy_id) 

120 ) 

121 return await self.__update_muc(stored, legacy_id, jid) 

122 

123 async def __update_muc( 

124 self, stored: Room | None, legacy_id: LegacyGroupIdType, jid: JID 

125 ): 

126 if stored is None: 

127 muc = self._muc_class(self.session, legacy_id=legacy_id, jid=jid) 

128 else: 

129 muc = self._muc_class.from_store(self.session, stored) 

130 if stored.updated: 

131 return muc 

132 

133 try: 

134 with muc.updating_info(): 

135 await muc.avatar_wrap_update_info() 

136 except XMPPError: 

137 raise 

138 except Exception as e: 

139 raise XMPPError("internal-server-error", str(e)) 

140 if not muc.user_nick: 

141 muc.user_nick = self._user_nick 

142 self.log.debug("MUC created: %r", muc) 

143 muc.pk = self.__store.update(muc) 

144 muc.archive = MessageArchive(muc.pk, self.xmpp.store.mam) 

145 return muc 

146 

147 @abc.abstractmethod 

148 async def fill(self): 

149 """ 

150 Establish a user's known groups. 

151 

152 This has to be overridden in plugins with group support and at the 

153 minimum, this should ``await self.by_legacy_id(group_id)`` for all 

154 the groups a user is part of. 

155 

156 Slidge internals will call this on successful :meth:`BaseSession.login` 

157 

158 """ 

159 if self.xmpp.GROUPS: 

160 raise NotImplementedError( 

161 "The plugin advertised support for groups but" 

162 " LegacyBookmarks.fill() was not overridden." 

163 ) 

164 

165 async def remove( 

166 self, 

167 muc: LegacyMUC, 

168 reason="You left this group from the official client.", 

169 kick=True, 

170 ) -> None: 

171 """ 

172 Delete everything about a specific group. 

173 

174 This should be called when the user leaves the group from the official 

175 app. 

176 

177 :param muc: The MUC to remove. 

178 :param reason: Optionally, a reason why this group was removed. 

179 :param kick: Whether the user should be kicked from this group. Set this 

180 to False in case you do this somewhere else in your code, eg, on 

181 receiving the confirmation that the group was deleted. 

182 """ 

183 assert muc.pk is not None 

184 if kick: 

185 user_participant = await muc.get_user_participant() 

186 user_participant.kick(reason) 

187 self.__store.delete(muc.pk)