Coverage for slidge/core/mixins/avatar.py: 81%

134 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1from asyncio import Task, create_task 

2from hashlib import sha1 

3from pathlib import Path 

4from typing import TYPE_CHECKING, Optional 

5 

6from slixmpp import JID 

7 

8from ...db.avatar import CachedAvatar, avatar_cache 

9from ...util.types import ( 

10 URL, 

11 AnyBaseSession, 

12 AvatarIdType, 

13 AvatarType, 

14 LegacyFileIdType, 

15) 

16 

17if TYPE_CHECKING: 

18 from ..pubsub import PepAvatar 

19 

20 

21class AvatarMixin: 

22 """ 

23 Mixin for XMPP entities that have avatars that represent them. 

24 

25 Both :py:class:`slidge.LegacyContact` and :py:class:`slidge.LegacyMUC` use 

26 :py:class:`.AvatarMixin`. 

27 """ 

28 

29 jid: JID = NotImplemented 

30 session: AnyBaseSession = NotImplemented 

31 _avatar_bare_jid: bool = NotImplemented 

32 

33 def __init__(self) -> None: 

34 super().__init__() 

35 self._set_avatar_task: Optional[Task] = None 

36 self.__broadcast_task: Optional[Task] = None 

37 self.__avatar_unique_id: Optional[AvatarIdType] = None 

38 self._avatar_pk: Optional[int] = None 

39 

40 @property 

41 def __avatar_jid(self): 

42 return JID(self.jid.bare) if self._avatar_bare_jid else self.jid 

43 

44 @property 

45 def avatar_id(self) -> Optional[AvatarIdType]: 

46 """ 

47 The unique ID of this entity's avatar. 

48 """ 

49 return self.__avatar_unique_id 

50 

51 @property 

52 def avatar(self) -> Optional[AvatarIdType]: 

53 """ 

54 This property can be used to set the avatar, but 

55 :py:meth:`~.AvatarMixin.set_avatar()` should be preferred because you can 

56 provide a unique ID for the avatar for efficient caching. 

57 Setting this is OKish in case the avatar type is a URL or a local path 

58 that can act as a legacy ID. 

59 

60 Python's ``property`` is abused here to maintain backwards 

61 compatibility, but when getting it you actually get the avatar legacy 

62 ID. 

63 """ 

64 return self.__avatar_unique_id 

65 

66 @avatar.setter 

67 def avatar(self, a: Optional[AvatarType]): 

68 if self._set_avatar_task: 

69 self._set_avatar_task.cancel() 

70 self.session.log.debug("Setting avatar with property") 

71 self._set_avatar_task = self.session.xmpp.loop.create_task( 

72 self.set_avatar(a, None, blocking=True, cancel=False), 

73 name=f"Set avatar of {self} from property", 

74 ) 

75 

76 @property 

77 def avatar_pk(self) -> int | None: 

78 return self._avatar_pk 

79 

80 @staticmethod 

81 def __get_uid(a: Optional[AvatarType]) -> Optional[AvatarIdType]: 

82 if isinstance(a, str): 

83 return URL(a) 

84 elif isinstance(a, Path): 

85 return str(a) 

86 elif isinstance(a, bytes): 

87 return sha1(a).hexdigest() 

88 elif a is None: 

89 return None 

90 raise TypeError("Bad avatar", a) 

91 

92 async def __set_avatar( 

93 self, a: Optional[AvatarType], uid: Optional[AvatarIdType], delete: bool 

94 ): 

95 self.__avatar_unique_id = uid 

96 

97 if a is None: 

98 cached_avatar = None 

99 self._avatar_pk = None 

100 else: 

101 try: 

102 cached_avatar = await avatar_cache.convert_or_get(a) 

103 except Exception as e: 

104 self.session.log.error("Failed to set avatar %s", a, exc_info=e) 

105 self._avatar_pk = None 

106 self.__avatar_unique_id = uid 

107 return 

108 self._avatar_pk = cached_avatar.pk 

109 

110 if self.__should_pubsub_broadcast(): 

111 await self.session.xmpp.pubsub.broadcast_avatar( 

112 self.__avatar_jid, self.session.user_jid, cached_avatar 

113 ) 

114 

115 if delete and isinstance(a, Path): 

116 a.unlink() 

117 

118 self._post_avatar_update() 

119 

120 def __should_pubsub_broadcast(self): 

121 return getattr(self, "is_friend", False) and getattr( 

122 self, "added_to_roster", False 

123 ) 

124 

125 async def _no_change(self, a: Optional[AvatarType], uid: Optional[AvatarIdType]): 

126 if a is None: 

127 return self.__avatar_unique_id is None 

128 if not self.__avatar_unique_id: 

129 return False 

130 if isinstance(uid, URL): 

131 if self.__avatar_unique_id != uid: 

132 return False 

133 return not await avatar_cache.url_modified(uid) 

134 return self.__avatar_unique_id == uid 

135 

136 async def set_avatar( 

137 self, 

138 a: Optional[AvatarType], 

139 avatar_unique_id: Optional[LegacyFileIdType] = None, 

140 delete: bool = False, 

141 blocking=False, 

142 cancel=True, 

143 ) -> None: 

144 """ 

145 Set an avatar for this entity 

146 

147 :param a: The avatar, in one of the types slidge supports 

148 :param avatar_unique_id: A globally unique ID for the avatar on the 

149 legacy network 

150 :param delete: If the avatar is provided as a Path, whether to delete 

151 it once used or not. 

152 :param blocking: Internal use by slidge for tests, do not use! 

153 :param cancel: Internal use by slidge, do not use! 

154 """ 

155 if avatar_unique_id is None and a is not None: 

156 avatar_unique_id = self.__get_uid(a) 

157 if await self._no_change(a, avatar_unique_id): 

158 return 

159 if cancel and self._set_avatar_task: 

160 self._set_avatar_task.cancel() 

161 awaitable = create_task( 

162 self.__set_avatar(a, avatar_unique_id, delete), 

163 name=f"Set pubsub avatar of {self}", 

164 ) 

165 if not self._set_avatar_task or self._set_avatar_task.done(): 

166 self._set_avatar_task = awaitable 

167 if blocking: 

168 await awaitable 

169 

170 def get_cached_avatar(self) -> Optional["CachedAvatar"]: 

171 if self._avatar_pk is None: 

172 return None 

173 return avatar_cache.get_by_pk(self._avatar_pk) 

174 

175 def get_avatar(self) -> Optional["PepAvatar"]: 

176 cached_avatar = self.get_cached_avatar() 

177 if cached_avatar is None: 

178 return None 

179 from ..pubsub import PepAvatar 

180 

181 item = PepAvatar() 

182 item.set_avatar_from_cache(cached_avatar) 

183 return item 

184 

185 def _post_avatar_update(self) -> None: 

186 return 

187 

188 def __get_cached_avatar_id(self): 

189 i = self._get_cached_avatar_id() 

190 if i is None: 

191 return None 

192 return self.session.xmpp.AVATAR_ID_TYPE(i) 

193 

194 def _get_cached_avatar_id(self) -> Optional[str]: 

195 raise NotImplementedError 

196 

197 async def avatar_wrap_update_info(self): 

198 cached_id = self.__get_cached_avatar_id() 

199 self.__avatar_unique_id = cached_id 

200 try: 

201 await self.update_info() # type:ignore 

202 except NotImplementedError: 

203 return 

204 new_id = self.avatar 

205 if isinstance(new_id, URL) and not await avatar_cache.url_modified(new_id): 

206 return 

207 elif new_id != cached_id: 

208 # at this point it means that update_info set the avatar, and we don't 

209 # need to do anything else 

210 return 

211 

212 if self.__should_pubsub_broadcast(): 

213 if new_id is None and cached_id is None: 

214 return 

215 if self._avatar_pk is not None: 

216 cached_avatar = avatar_cache.get_by_pk(self._avatar_pk) 

217 else: 

218 cached_avatar = None 

219 self.__broadcast_task = self.session.xmpp.loop.create_task( 

220 self.session.xmpp.pubsub.broadcast_avatar( 

221 self.__avatar_jid, self.session.user_jid, cached_avatar 

222 ) 

223 ) 

224 

225 def _set_avatar_from_store(self, stored): 

226 if stored.avatar_id is None: 

227 return 

228 if stored.avatar is None: 

229 # seems to happen after avatar cleanup for some reason? 

230 self.__avatar_unique_id = None 

231 return 

232 self.__avatar_unique_id = ( 

233 stored.avatar.legacy_id 

234 if stored.avatar.legacy_id is not None 

235 else URL(stored.avatar.url) 

236 )