Coverage for slidge/core/mixins/presence.py: 83%
142 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import re
2from asyncio import Task, sleep
3from datetime import datetime, timedelta, timezone
4from functools import partial
5from typing import TYPE_CHECKING, Optional
7from slixmpp.types import PresenceShows, PresenceTypes
8from sqlalchemy.exc import InvalidRequestError
9from sqlalchemy.orm.exc import DetachedInstanceError
11from ...db.models import Contact, Participant
12from ...util.types import CachedPresence
13from .base import BaseSender
14from .db import DBMixin
16if TYPE_CHECKING:
17 from ..session import BaseSession
20class _NoChange(Exception):
21 pass
24_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"}
25_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]()
26_ONE_WEEK_SECONDS = 3600 * 24 * 7
29async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None:
30 await sleep(_ONE_WEEK_SECONDS)
31 with session.xmpp.store.session() as orm:
32 stored = orm.get(Contact, contact_pk)
33 if stored is None:
34 return
35 contact = session.contacts.from_store(stored)
36 contact.send_last_presence(force=True, no_cache_online=False)
39def _clear_last_seen_task(contact_pk: int, _task) -> None:
40 try:
41 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk]
42 except KeyError:
43 pass
46class PresenceMixin(BaseSender, DBMixin):
47 _ONLY_SEND_PRESENCE_CHANGES = False
49 # this attribute actually only exists for contacts and not participants
50 _updating_info: bool
51 stored: Contact | Participant
53 def __init__(self, *a, **k) -> None:
54 super().__init__(*a, **k)
55 # this is only used when a presence is set during Contact.update_info(),
56 # when the contact does not have a DB primary key yet, and is written
57 # to DB at the end of update_info()
58 self.cached_presence: Optional[CachedPresence] = None
60 def __is_contact(self) -> bool:
61 return isinstance(self.stored, Contact)
63 def __stored(self) -> Contact | None:
64 if self.__is_contact():
65 assert isinstance(self.stored, Contact)
66 return self.stored
67 else:
68 assert isinstance(self.stored, Participant)
69 try:
70 return self.stored.contact
71 except DetachedInstanceError:
72 with self.xmpp.store.session() as orm:
73 orm.add(self.stored)
74 if self.stored.contact is None:
75 return None
76 orm.refresh(self.stored.contact)
77 orm.merge(self.stored)
78 return self.stored.contact
80 @property
81 def __contact_pk(self) -> int | None:
82 stored = self.__stored()
83 return None if stored is None else stored.id
85 def _get_last_presence(self) -> Optional[CachedPresence]:
86 stored = self.__stored()
87 if stored is None or not stored.cached_presence:
88 return None
89 return CachedPresence(
90 None
91 if stored.last_seen is None
92 else stored.last_seen.replace(tzinfo=timezone.utc),
93 stored.ptype, # type:ignore
94 stored.pstatus,
95 stored.pshow, # type:ignore
96 )
98 def _store_last_presence(self, new: CachedPresence) -> None:
99 if self.__is_contact():
100 contact = self
101 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment]
102 return
103 contact.update_stored_attribute( # type:ignore[attr-defined]
104 cached_presence=True,
105 **new._asdict(),
106 )
108 def _make_presence(
109 self,
110 *,
111 last_seen: Optional[datetime] = None,
112 force: bool = False,
113 bare: bool = False,
114 ptype: Optional[PresenceTypes] = None,
115 pstatus: Optional[str] = None,
116 pshow: Optional[PresenceShows] = None,
117 ):
118 if last_seen and last_seen.tzinfo is None:
119 last_seen = last_seen.astimezone(timezone.utc)
121 old = self._get_last_presence()
123 if ptype not in _FRIEND_REQUEST_PRESENCES:
124 new = CachedPresence(
125 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow
126 )
127 if old != new:
128 if hasattr(self, "muc") and ptype == "unavailable":
129 stored = self.__stored()
130 if stored is not None:
131 stored.cached_presence = False
132 self.commit(merge=True)
133 else:
134 self._store_last_presence(new)
135 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES:
136 if old == new:
137 self.session.log.debug("Presence is the same as cached")
138 raise _NoChange
139 self.session.log.debug(
140 "Presence is not the same as cached: %s vs %s", old, new
141 )
143 p = self.xmpp.make_presence(
144 pfrom=self.jid.bare if bare else self.jid,
145 ptype=ptype,
146 pshow=pshow,
147 pstatus=pstatus,
148 )
149 if last_seen:
150 # it's ugly to check for the presence of this string, but a better fix is more work
151 if not re.match(
152 ".*Last seen .*", p["status"]
153 ) and self.session.user.preferences.get("last_seen_fallback", True):
154 last_seen_fallback, recent = get_last_seen_fallback(last_seen)
155 if p["status"]:
156 p["status"] = p["status"] + " -- " + last_seen_fallback
157 else:
158 p["status"] = last_seen_fallback
159 pk = self.__contact_pk
160 if recent and pk is not None:
161 # if less than a week, we use sth like 'Last seen: Monday, 8:05",
162 # but if lasts more than a week, this is not very informative, so
163 # we need to force resend an updated presence status
164 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk)
165 if task is not None:
166 task.cancel()
167 task = self.session.create_task(
168 _update_last_seen_fallback(self.session, pk)
169 )
170 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task
171 task.add_done_callback(partial(_clear_last_seen_task, pk))
172 p["idle"]["since"] = last_seen
173 return p
175 def send_last_presence(
176 self, force: bool = False, no_cache_online: bool = False
177 ) -> None:
178 if (cache := self._get_last_presence()) is None:
179 if force:
180 if no_cache_online:
181 self.online()
182 else:
183 self.offline()
184 return
185 self._send(
186 self._make_presence(
187 last_seen=cache.last_seen,
188 force=True,
189 ptype=cache.ptype,
190 pshow=cache.pshow,
191 pstatus=cache.pstatus,
192 )
193 )
195 def online(
196 self,
197 status: Optional[str] = None,
198 last_seen: Optional[datetime] = None,
199 ) -> None:
200 """
201 Send an "online" presence from this contact to the user.
203 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears"
204 :param last_seen: For :xep:`0319`
205 """
206 try:
207 self._send(self._make_presence(pstatus=status, last_seen=last_seen))
208 except _NoChange:
209 pass
211 def away(
212 self,
213 status: Optional[str] = None,
214 last_seen: Optional[datetime] = None,
215 ) -> None:
216 """
217 Send an "away" presence from this contact to the user.
219 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
220 which concerns a specific conversation, ie a specific "chat window"
222 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
223 :param last_seen: For :xep:`0319`
224 """
225 try:
226 self._send(
227 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen)
228 )
229 except _NoChange:
230 pass
232 def extended_away(
233 self,
234 status: Optional[str] = None,
235 last_seen: Optional[datetime] = None,
236 ) -> None:
237 """
238 Send an "extended away" presence from this contact to the user.
240 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
241 which concerns a specific conversation, ie a specific "chat window"
243 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
244 :param last_seen: For :xep:`0319`
245 """
246 try:
247 self._send(
248 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen)
249 )
250 except _NoChange:
251 pass
253 def busy(
254 self,
255 status: Optional[str] = None,
256 last_seen: Optional[datetime] = None,
257 ) -> None:
258 """
259 Send a "busy" (ie, "dnd") presence from this contact to the user,
261 :param status: eg: "Trying to make sense of XEP-0100"
262 :param last_seen: For :xep:`0319`
263 """
264 try:
265 self._send(
266 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen)
267 )
268 except _NoChange:
269 pass
271 def offline(
272 self,
273 status: Optional[str] = None,
274 last_seen: Optional[datetime] = None,
275 ) -> None:
276 """
277 Send an "offline" presence from this contact to the user.
279 :param status: eg: "Trying to make sense of XEP-0100"
280 :param last_seen: For :xep:`0319`
281 """
282 try:
283 self._send(
284 self._make_presence(
285 pstatus=status, ptype="unavailable", last_seen=last_seen
286 )
287 )
288 except _NoChange:
289 pass
292def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]:
293 now = datetime.now(tz=timezone.utc)
294 if now - last_seen < timedelta(days=7):
295 return f"Last seen {last_seen:%A %H:%M %p GMT}", True
296 else:
297 return f"Last seen {last_seen:%b %-d %Y}", False