Coverage for slidge/core/mixins/avatar.py: 90%
104 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1from asyncio import Task
2from pathlib import Path
3from typing import TYPE_CHECKING, Optional
5from PIL import UnidentifiedImageError
6from slixmpp import JID
7from sqlalchemy.orm.exc import DetachedInstanceError
9from ...db.avatar import CachedAvatar, avatar_cache
10from ...db.models import Contact, Room
11from ...util.types import AnyBaseSession, Avatar
12from .db import UpdateInfoMixin
14if TYPE_CHECKING:
15 from ..pubsub import PepAvatar
18class AvatarMixin(UpdateInfoMixin):
19 """
20 Mixin for XMPP entities that have avatars that represent them.
22 Both :py:class:`slidge.LegacyContact` and :py:class:`slidge.LegacyMUC` use
23 :py:class:`.AvatarMixin`.
24 """
26 jid: JID = NotImplemented
27 session: AnyBaseSession = NotImplemented
28 stored: Contact | Room
30 def __init__(self) -> None:
31 super().__init__()
32 self._set_avatar_task: Task | None = None
34 @property
35 def avatar(self) -> Avatar | None:
36 """
37 This property can be used to set or unset the avatar.
39 Unlike the awaitable :method:`.set_avatar`, it schedules the update for
40 later execution and is not blocking
41 """
42 try:
43 if self.stored.avatar is None:
44 return None
45 except DetachedInstanceError:
46 self.merge()
47 if self.stored.avatar is None:
48 return None
49 if self.stored.avatar.legacy_id is None:
50 unique_id = None
51 else:
52 unique_id = self.session.xmpp.AVATAR_ID_TYPE(self.stored.avatar.legacy_id)
53 return Avatar(
54 unique_id=unique_id,
55 url=self.stored.avatar.url,
56 )
58 @avatar.setter
59 def avatar(self, avatar: Avatar | Path | str | None) -> None:
60 avatar = convert_avatar(avatar)
61 if self._set_avatar_task:
62 self._set_avatar_task.cancel()
63 self.session.log.debug("Setting avatar with property")
64 self._set_avatar_task = self.session.create_task(self.set_avatar(avatar))
66 async def __has_changed(self, avatar: Avatar | None) -> bool:
67 if self.avatar is None:
68 return avatar is not None
69 if avatar is None:
70 return self.avatar is not None
72 if self.avatar.unique_id is not None and avatar.unique_id is not None:
73 return self.avatar.unique_id != avatar.unique_id
75 if (
76 self.avatar.url is not None
77 and avatar.url is not None
78 and self.avatar.url == avatar.url
79 ):
80 return await avatar_cache.url_modified(avatar.url)
82 if avatar.path is not None:
83 cached = self.get_cached_avatar()
84 if cached is not None:
85 return cached.path.read_bytes() != avatar.path.read_bytes()
87 return True
89 async def set_avatar(
90 self, avatar: Avatar | Path | str | None = None, delete: bool = False
91 ) -> None:
92 """
93 Set an avatar for this entity
95 :param avatar: The avatar. Should ideally come with a legacy network-wide unique
96 ID
97 :param delete: If the avatar is provided as a Path, whether to delete
98 it once used or not.
99 """
100 avatar = convert_avatar(avatar)
102 if not await self.__has_changed(avatar):
103 return
105 if avatar is None:
106 cached_avatar = None
107 else:
108 try:
109 cached_avatar = await avatar_cache.convert_or_get(avatar)
110 except UnidentifiedImageError:
111 self.session.log.warning("%s is not a valid image", avatar)
112 cached_avatar = None
113 except Exception as e:
114 self.session.log.error("Failed to set avatar %s: %s", avatar, e)
115 cached_avatar = None
117 if delete:
118 if avatar is None or avatar.path is None:
119 self.session.log.warning(
120 "Requested avatar path delete, but no path provided"
121 )
122 else:
123 avatar.path.unlink()
125 stored_avatar = None if cached_avatar is None else cached_avatar.stored
126 if not self._updating_info:
127 with self.xmpp.store.session() as orm:
128 with orm.no_autoflush:
129 self.stored = orm.merge(self.stored)
130 orm.refresh(self.stored)
132 self.stored.avatar = stored_avatar
133 self.commit(merge=True)
135 self._post_avatar_update(cached_avatar)
137 def get_cached_avatar(self) -> Optional["CachedAvatar"]:
138 try:
139 if self.stored.avatar is None:
140 return None
141 except DetachedInstanceError:
142 self.merge()
143 if self.stored.avatar is None:
144 return None
145 return avatar_cache.get(self.stored.avatar)
147 def get_avatar(self) -> Optional["PepAvatar"]:
148 cached_avatar = self.get_cached_avatar()
149 if cached_avatar is None:
150 return None
151 from ..pubsub import PepAvatar
153 item = PepAvatar()
154 item.set_avatar_from_cache(cached_avatar)
155 return item
157 def _post_avatar_update(self, cached_avatar: Optional["CachedAvatar"]) -> None:
158 raise NotImplementedError
161def convert_avatar(
162 avatar: Avatar | Path | str | None, unique_id: str | None = None
163) -> Avatar | None:
164 if isinstance(avatar, Path):
165 return Avatar(path=avatar, unique_id=unique_id)
166 if isinstance(avatar, str):
167 return Avatar(url=avatar)
168 if avatar is None or all(x is None for x in avatar):
169 return None
170 return avatar