Coverage for slidge/core/mixins/avatar.py: 90%

100 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1from asyncio import Task 

2from pathlib import Path 

3from typing import TYPE_CHECKING, Optional 

4 

5from PIL import UnidentifiedImageError 

6from slixmpp import JID 

7from sqlalchemy.orm.exc import DetachedInstanceError 

8 

9from ...db.avatar import CachedAvatar, avatar_cache 

10from ...db.models import Contact, Room 

11from ...util.types import AnyBaseSession, Avatar 

12from .db import UpdateInfoMixin 

13 

14if TYPE_CHECKING: 

15 from ..pubsub import PepAvatar 

16 

17 

18class AvatarMixin(UpdateInfoMixin): 

19 """ 

20 Mixin for XMPP entities that have avatars that represent them. 

21 

22 Both :py:class:`slidge.LegacyContact` and :py:class:`slidge.LegacyMUC` use 

23 :py:class:`.AvatarMixin`. 

24 """ 

25 

26 jid: JID = NotImplemented 

27 session: AnyBaseSession = NotImplemented 

28 stored: Contact | Room 

29 

30 def __init__(self) -> None: 

31 super().__init__() 

32 self._set_avatar_task: Task | None = None 

33 

34 @property 

35 def avatar(self) -> Avatar | None: 

36 """ 

37 This property can be used to set or unset the avatar. 

38 

39 Unlike the awaitable :method:`.set_avatar`, it schedules the update for 

40 later execution and is not blocking 

41 """ 

42 try: 

43 if self.stored.avatar is None: 

44 return None 

45 except DetachedInstanceError: 

46 self.merge() 

47 if self.stored.avatar is None: 

48 return None 

49 if self.stored.avatar.legacy_id is None: 

50 unique_id = None 

51 else: 

52 unique_id = self.session.xmpp.AVATAR_ID_TYPE(self.stored.avatar.legacy_id) 

53 return Avatar( 

54 unique_id=unique_id, 

55 url=self.stored.avatar.url, 

56 ) 

57 

58 @avatar.setter 

59 def avatar(self, avatar: Avatar | Path | str | None) -> None: 

60 avatar = convert_avatar(avatar) 

61 if self._set_avatar_task: 

62 self._set_avatar_task.cancel() 

63 self.session.log.debug("Setting avatar with property") 

64 self._set_avatar_task = self.session.create_task(self.set_avatar(avatar)) 

65 

66 async def __has_changed(self, avatar: Avatar | None) -> bool: 

67 if self.avatar is None: 

68 return avatar is not None 

69 if avatar is None: 

70 return self.avatar is not None 

71 

72 if self.avatar.unique_id is not None and avatar.unique_id is not None: 

73 return self.avatar.unique_id != avatar.unique_id 

74 

75 if ( 

76 self.avatar.url is not None 

77 and avatar.url is not None 

78 and self.avatar.url == avatar.url 

79 ): 

80 return await avatar_cache.url_modified(avatar.url) 

81 

82 if avatar.path is not None: 

83 cached = self.get_cached_avatar() 

84 if cached is not None: 

85 return cached.path.read_bytes() != avatar.path.read_bytes() 

86 

87 return True 

88 

89 async def set_avatar( 

90 self, avatar: Avatar | Path | str | None = None, delete: bool = False 

91 ) -> None: 

92 """ 

93 Set an avatar for this entity 

94 

95 :param avatar: The avatar. Should ideally come with a legacy network-wide unique 

96 ID 

97 :param delete: If the avatar is provided as a Path, whether to delete 

98 it once used or not. 

99 """ 

100 avatar = convert_avatar(avatar) 

101 

102 if not await self.__has_changed(avatar): 

103 return 

104 

105 if avatar is None: 

106 cached_avatar = None 

107 else: 

108 try: 

109 cached_avatar = await avatar_cache.convert_or_get(avatar) 

110 except UnidentifiedImageError: 

111 self.session.log.warning("%s is not a valid image", avatar) 

112 cached_avatar = None 

113 except Exception as e: 

114 self.session.log.error("Failed to set avatar %s: %s", avatar, e) 

115 cached_avatar = None 

116 

117 if delete: 

118 if avatar is None or avatar.path is None: 

119 self.session.log.warning( 

120 "Requested avatar path delete, but no path provided" 

121 ) 

122 else: 

123 avatar.path.unlink() 

124 

125 if cached_avatar is None: 

126 self.stored.avatar = None 

127 else: 

128 self.stored.avatar = cached_avatar.stored 

129 self.commit(merge=True) 

130 self._post_avatar_update(cached_avatar) 

131 

132 def get_cached_avatar(self) -> Optional["CachedAvatar"]: 

133 try: 

134 if self.stored.avatar is None: 

135 return None 

136 except DetachedInstanceError: 

137 self.merge() 

138 if self.stored.avatar is None: 

139 return None 

140 return avatar_cache.get(self.stored.avatar) 

141 

142 def get_avatar(self) -> Optional["PepAvatar"]: 

143 cached_avatar = self.get_cached_avatar() 

144 if cached_avatar is None: 

145 return None 

146 from ..pubsub import PepAvatar 

147 

148 item = PepAvatar() 

149 item.set_avatar_from_cache(cached_avatar) 

150 return item 

151 

152 def _post_avatar_update(self, cached_avatar: Optional["CachedAvatar"]) -> None: 

153 raise NotImplementedError 

154 

155 

156def convert_avatar( 

157 avatar: Avatar | Path | str | None, unique_id: str | None = None 

158) -> Avatar | None: 

159 if isinstance(avatar, Path): 

160 return Avatar(path=avatar, unique_id=unique_id) 

161 if isinstance(avatar, str): 

162 return Avatar(url=avatar) 

163 if avatar is None or all(x is None for x in avatar): 

164 return None 

165 return avatar