Coverage for slidge / core / mixins / avatar.py: 89%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import hashlib 

2from asyncio import Task 

3from pathlib import Path 

4from typing import TYPE_CHECKING, Optional 

5 

6from PIL import UnidentifiedImageError 

7from slixmpp import JID 

8from sqlalchemy.orm.exc import DetachedInstanceError 

9 

10from ...db.avatar import CachedAvatar, avatar_cache 

11from ...db.models import Contact, Room 

12from ...util.types import AnyBaseSession, Avatar 

13from .db import UpdateInfoMixin 

14 

15if TYPE_CHECKING: 

16 from ..pubsub import PepAvatar 

17 

18 

19class AvatarMixin(UpdateInfoMixin): 

20 """ 

21 Mixin for XMPP entities that have avatars that represent them. 

22 

23 Both :py:class:`slidge.LegacyContact` and :py:class:`slidge.LegacyMUC` use 

24 :py:class:`.AvatarMixin`. 

25 """ 

26 

27 jid: JID = NotImplemented 

28 session: AnyBaseSession = NotImplemented 

29 stored: Contact | Room 

30 

31 def __init__(self) -> None: 

32 super().__init__() 

33 self._set_avatar_task: Task | None = None 

34 

35 @property 

36 def avatar(self) -> Avatar | None: 

37 """ 

38 This property can be used to set or unset the avatar. 

39 

40 Unlike the awaitable :method:`.set_avatar`, it schedules the update for 

41 later execution and is not blocking 

42 """ 

43 try: 

44 if self.stored.avatar is None: 

45 return None 

46 except DetachedInstanceError: 

47 self.merge() 

48 if self.stored.avatar is None: 

49 return None 

50 if self.stored.avatar.legacy_id is None: 

51 unique_id = None 

52 else: 

53 unique_id = self.session.xmpp.AVATAR_ID_TYPE(self.stored.avatar.legacy_id) 

54 return Avatar( 

55 unique_id=unique_id, 

56 url=self.stored.avatar.url, 

57 ) 

58 

59 @avatar.setter 

60 def avatar(self, avatar: Avatar | Path | str | None) -> None: 

61 avatar = convert_avatar(avatar) 

62 if self._set_avatar_task: 

63 self._set_avatar_task.cancel() 

64 self.session.log.debug("Setting avatar with property") 

65 self._set_avatar_task = self.session.create_task(self.set_avatar(avatar)) 

66 

67 async def __has_changed(self, avatar: Avatar | None) -> bool: 

68 if self.avatar is None: 

69 return avatar is not None 

70 if avatar is None: 

71 return self.avatar is not None 

72 

73 if self.avatar.unique_id is not None and avatar.unique_id is not None: 

74 return self.avatar.unique_id != avatar.unique_id 

75 

76 if ( 

77 self.avatar.url is not None 

78 and avatar.url is not None 

79 and self.avatar.url == avatar.url 

80 ): 

81 return await avatar_cache.url_modified(avatar.url) 

82 

83 if avatar.path is not None: 

84 cached = self.get_cached_avatar() 

85 if cached is not None: 

86 return cached.path.read_bytes() != avatar.path.read_bytes() 

87 

88 return True 

89 

90 async def set_avatar( 

91 self, avatar: Avatar | Path | str | None = None, delete: bool = False 

92 ) -> None: 

93 """ 

94 Set an avatar for this entity 

95 

96 :param avatar: The avatar. Should ideally come with a legacy network-wide unique 

97 ID 

98 :param delete: If the avatar is provided as a Path, whether to delete 

99 it once used or not. 

100 """ 

101 avatar = convert_avatar(avatar) 

102 

103 if avatar is not None and avatar.unique_id is None and avatar.data is not None: 

104 self.session.log.debug("Hashing bytes to generate a unique ID") 

105 avatar = Avatar( 

106 data=avatar.data, unique_id=hashlib.sha512(avatar.data).hexdigest() 

107 ) 

108 

109 if not await self.__has_changed(avatar): 

110 return 

111 

112 if avatar is None: 

113 cached_avatar = None 

114 else: 

115 try: 

116 cached_avatar = await avatar_cache.convert_or_get(avatar) 

117 except UnidentifiedImageError: 

118 self.session.log.warning("%s is not a valid image", avatar) 

119 cached_avatar = None 

120 except Exception as e: 

121 self.session.log.error("Failed to set avatar %s: %s", avatar, e) 

122 cached_avatar = None 

123 

124 if delete: 

125 if avatar is None or avatar.path is None: 

126 self.session.log.warning( 

127 "Requested avatar path delete, but no path provided" 

128 ) 

129 else: 

130 avatar.path.unlink() 

131 

132 stored_avatar = None if cached_avatar is None else cached_avatar.stored 

133 if not self._updating_info: 

134 with self.xmpp.store.session() as orm: 

135 with orm.no_autoflush: 

136 self.stored = orm.merge(self.stored) 

137 orm.refresh(self.stored) 

138 

139 self.stored.avatar = stored_avatar 

140 self.commit(merge=True) 

141 

142 self._post_avatar_update(cached_avatar) 

143 

144 def get_cached_avatar(self) -> Optional["CachedAvatar"]: 

145 try: 

146 if self.stored.avatar is None: 

147 return None 

148 except DetachedInstanceError: 

149 self.merge() 

150 if self.stored.avatar is None: 

151 return None 

152 return avatar_cache.get(self.stored.avatar) 

153 

154 def get_avatar(self) -> Optional["PepAvatar"]: 

155 cached_avatar = self.get_cached_avatar() 

156 if cached_avatar is None: 

157 return None 

158 from ..pubsub import PepAvatar 

159 

160 item = PepAvatar() 

161 item.set_avatar_from_cache(cached_avatar) 

162 return item 

163 

164 def _post_avatar_update(self, cached_avatar: Optional["CachedAvatar"]) -> None: 

165 raise NotImplementedError 

166 

167 

168def convert_avatar( 

169 avatar: Avatar | Path | str | None, unique_id: str | None = None 

170) -> Avatar | None: 

171 if isinstance(avatar, Path): 

172 return Avatar(path=avatar, unique_id=unique_id) 

173 if isinstance(avatar, str): 

174 return Avatar(url=avatar) 

175 if avatar is None or all(x is None for x in avatar): 

176 return None 

177 return avatar