Coverage for slidge/core/mixins/avatar.py: 90%
100 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1from asyncio import Task
2from pathlib import Path
3from typing import TYPE_CHECKING, Optional
5from PIL import UnidentifiedImageError
6from slixmpp import JID
7from sqlalchemy.orm.exc import DetachedInstanceError
9from ...db.avatar import CachedAvatar, avatar_cache
10from ...db.models import Contact, Room
11from ...util.types import AnyBaseSession, Avatar
12from .db import UpdateInfoMixin
14if TYPE_CHECKING:
15 from ..pubsub import PepAvatar
18class AvatarMixin(UpdateInfoMixin):
19 """
20 Mixin for XMPP entities that have avatars that represent them.
22 Both :py:class:`slidge.LegacyContact` and :py:class:`slidge.LegacyMUC` use
23 :py:class:`.AvatarMixin`.
24 """
26 jid: JID = NotImplemented
27 session: AnyBaseSession = NotImplemented
28 stored: Contact | Room
30 def __init__(self) -> None:
31 super().__init__()
32 self._set_avatar_task: Task | None = None
34 @property
35 def avatar(self) -> Avatar | None:
36 """
37 This property can be used to set or unset the avatar.
39 Unlike the awaitable :method:`.set_avatar`, it schedules the update for
40 later execution and is not blocking
41 """
42 try:
43 if self.stored.avatar is None:
44 return None
45 except DetachedInstanceError:
46 self.merge()
47 if self.stored.avatar is None:
48 return None
49 if self.stored.avatar.legacy_id is None:
50 unique_id = None
51 else:
52 unique_id = self.session.xmpp.AVATAR_ID_TYPE(self.stored.avatar.legacy_id)
53 return Avatar(
54 unique_id=unique_id,
55 url=self.stored.avatar.url,
56 )
58 @avatar.setter
59 def avatar(self, avatar: Avatar | Path | str | None) -> None:
60 avatar = convert_avatar(avatar)
61 if self._set_avatar_task:
62 self._set_avatar_task.cancel()
63 self.session.log.debug("Setting avatar with property")
64 self._set_avatar_task = self.session.create_task(self.set_avatar(avatar))
66 async def __has_changed(self, avatar: Avatar | None) -> bool:
67 if self.avatar is None:
68 return avatar is not None
69 if avatar is None:
70 return self.avatar is not None
72 if self.avatar.unique_id is not None and avatar.unique_id is not None:
73 return self.avatar.unique_id != avatar.unique_id
75 if (
76 self.avatar.url is not None
77 and avatar.url is not None
78 and self.avatar.url == avatar.url
79 ):
80 return await avatar_cache.url_modified(avatar.url)
82 if avatar.path is not None:
83 cached = self.get_cached_avatar()
84 if cached is not None:
85 return cached.path.read_bytes() != avatar.path.read_bytes()
87 return True
89 async def set_avatar(
90 self, avatar: Avatar | Path | str | None = None, delete: bool = False
91 ) -> None:
92 """
93 Set an avatar for this entity
95 :param avatar: The avatar. Should ideally come with a legacy network-wide unique
96 ID
97 :param delete: If the avatar is provided as a Path, whether to delete
98 it once used or not.
99 """
100 avatar = convert_avatar(avatar)
102 if not await self.__has_changed(avatar):
103 return
105 if avatar is None:
106 cached_avatar = None
107 else:
108 try:
109 cached_avatar = await avatar_cache.convert_or_get(avatar)
110 except UnidentifiedImageError:
111 self.session.log.warning("%s is not a valid image", avatar)
112 cached_avatar = None
113 except Exception as e:
114 self.session.log.error("Failed to set avatar %s: %s", avatar, e)
115 cached_avatar = None
117 if delete:
118 if avatar is None or avatar.path is None:
119 self.session.log.warning(
120 "Requested avatar path delete, but no path provided"
121 )
122 else:
123 avatar.path.unlink()
125 if cached_avatar is None:
126 self.stored.avatar = None
127 else:
128 self.stored.avatar = cached_avatar.stored
129 self.commit(merge=True)
130 self._post_avatar_update(cached_avatar)
132 def get_cached_avatar(self) -> Optional["CachedAvatar"]:
133 try:
134 if self.stored.avatar is None:
135 return None
136 except DetachedInstanceError:
137 self.merge()
138 if self.stored.avatar is None:
139 return None
140 return avatar_cache.get(self.stored.avatar)
142 def get_avatar(self) -> Optional["PepAvatar"]:
143 cached_avatar = self.get_cached_avatar()
144 if cached_avatar is None:
145 return None
146 from ..pubsub import PepAvatar
148 item = PepAvatar()
149 item.set_avatar_from_cache(cached_avatar)
150 return item
152 def _post_avatar_update(self, cached_avatar: Optional["CachedAvatar"]) -> None:
153 raise NotImplementedError
156def convert_avatar(
157 avatar: Avatar | Path | str | None, unique_id: str | None = None
158) -> Avatar | None:
159 if isinstance(avatar, Path):
160 return Avatar(path=avatar, unique_id=unique_id)
161 if isinstance(avatar, str):
162 return Avatar(url=avatar)
163 if avatar is None or all(x is None for x in avatar):
164 return None
165 return avatar