Coverage for slidge / core / mixins / presence.py: 83%
142 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import re
2from asyncio import Task, sleep
3from datetime import UTC, datetime, timedelta
4from functools import partial
5from typing import TYPE_CHECKING
7from slixmpp import Presence
8from slixmpp.types import PresenceShows, PresenceTypes
9from sqlalchemy.orm.exc import DetachedInstanceError
11from ...db.models import Contact, Participant
12from ...util.types import AnySession, CachedPresence
13from .base import BaseSender
14from .db import DBMixin
16if TYPE_CHECKING:
17 pass
20class _NoChange(Exception):
21 pass
24_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"}
25_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task[None]]()
26_ONE_WEEK_SECONDS = 3600 * 24 * 7
29async def _update_last_seen_fallback(session: AnySession, contact_pk: int) -> None:
30 await sleep(_ONE_WEEK_SECONDS)
31 with session.xmpp.store.session() as orm:
32 stored = orm.get(Contact, contact_pk)
33 if stored is None:
34 return
35 contact = session.contacts.from_store(stored)
36 contact.send_last_presence(force=True, no_cache_online=False)
39def _clear_last_seen_task(contact_pk: int, _task: Task[None]) -> None:
40 try:
41 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk]
42 except KeyError:
43 pass
46class PresenceMixin(BaseSender, DBMixin):
47 _ONLY_SEND_PRESENCE_CHANGES = False
49 # this attribute actually only exists for contacts and not participants
50 _updating_info: bool
51 stored: Contact | Participant
53 def __init__(self, *a: object, **k: object) -> None:
54 super().__init__(*a, **k)
55 # this is only used when a presence is set during Contact.update_info(),
56 # when the contact does not have a DB primary key yet, and is written
57 # to DB at the end of update_info()
58 self.cached_presence: CachedPresence | None = None
60 def __is_contact(self) -> bool:
61 return isinstance(self.stored, Contact)
63 def __stored(self) -> Contact | None:
64 if self.__is_contact():
65 assert isinstance(self.stored, Contact)
66 return self.stored
67 else:
68 assert isinstance(self.stored, Participant)
69 try:
70 return self.stored.contact
71 except DetachedInstanceError:
72 with self.xmpp.store.session() as orm:
73 orm.add(self.stored)
74 if self.stored.contact is None:
75 return None
76 orm.refresh(self.stored.contact)
77 orm.merge(self.stored)
78 return self.stored.contact
80 @property
81 def __contact_pk(self) -> int | None:
82 stored = self.__stored()
83 return None if stored is None else stored.id
85 def _get_last_presence(self) -> CachedPresence | None:
86 stored = self.__stored()
87 if stored is None or not stored.cached_presence:
88 return None
89 return CachedPresence(
90 None if stored.last_seen is None else stored.last_seen.replace(tzinfo=UTC),
91 stored.ptype, # type:ignore
92 stored.pstatus,
93 stored.pshow, # type:ignore
94 )
96 def _store_last_presence(self, new: CachedPresence) -> None:
97 if self.__is_contact():
98 contact = self
99 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment]
100 return
101 contact.update_stored_attribute( # type:ignore[attr-defined]
102 cached_presence=True,
103 **new._asdict(),
104 )
106 def _make_presence(
107 self,
108 *,
109 last_seen: datetime | None = None,
110 force: bool = False,
111 bare: bool = False,
112 ptype: PresenceTypes | None = None,
113 pstatus: str | None = None,
114 pshow: PresenceShows | None = None,
115 ) -> Presence:
116 if last_seen and last_seen.tzinfo is None:
117 last_seen = last_seen.astimezone(UTC)
119 old = self._get_last_presence()
121 if ptype not in _FRIEND_REQUEST_PRESENCES:
122 new = CachedPresence(
123 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow
124 )
125 if old != new:
126 if hasattr(self, "muc") and ptype == "unavailable":
127 stored = self.__stored()
128 if stored is not None:
129 stored.cached_presence = False
130 self.commit(merge=True)
131 else:
132 self._store_last_presence(new)
133 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES:
134 if old == new:
135 self.session.log.debug("Presence is the same as cached")
136 raise _NoChange
137 self.session.log.debug(
138 "Presence is not the same as cached: %s vs %s", old, new
139 )
141 p = self.xmpp.make_presence(
142 pfrom=self.jid.bare if bare else self.jid,
143 ptype=ptype,
144 pshow=pshow,
145 pstatus=pstatus,
146 )
147 if last_seen:
148 # it's ugly to check for the presence of this string, but a better fix is more work
149 if not re.match(
150 ".*Last seen .*", p["status"]
151 ) and self.session.user.preferences.get("last_seen_fallback", True):
152 last_seen_fallback, recent = get_last_seen_fallback(last_seen)
153 if p["status"]:
154 p["status"] = p["status"] + " -- " + last_seen_fallback
155 else:
156 p["status"] = last_seen_fallback
157 pk = self.__contact_pk
158 if recent and pk is not None:
159 # if less than a week, we use sth like 'Last seen: Monday, 8:05",
160 # but if lasts more than a week, this is not very informative, so
161 # we need to force resend an updated presence status
162 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk)
163 if task is not None:
164 task.cancel()
165 task = self.session.create_task(
166 _update_last_seen_fallback(self.session, pk)
167 )
168 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task
169 task.add_done_callback(partial(_clear_last_seen_task, pk))
170 p["idle"]["since"] = last_seen
171 return p
173 def send_last_presence(
174 self, force: bool = False, no_cache_online: bool = False
175 ) -> None:
176 if (cache := self._get_last_presence()) is None:
177 if force:
178 if no_cache_online:
179 self.online()
180 else:
181 self.offline()
182 return
183 self._send(
184 self._make_presence(
185 last_seen=cache.last_seen,
186 force=True,
187 ptype=cache.ptype,
188 pshow=cache.pshow,
189 pstatus=cache.pstatus,
190 )
191 )
193 def online(
194 self,
195 status: str | None = None,
196 last_seen: datetime | None = None,
197 ) -> None:
198 """
199 Send an "online" presence from this contact to the user.
201 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears"
202 :param last_seen: For :xep:`0319`
203 """
204 try:
205 self._send(self._make_presence(pstatus=status, last_seen=last_seen))
206 except _NoChange:
207 pass
209 def away(
210 self,
211 status: str | None = None,
212 last_seen: datetime | None = None,
213 ) -> None:
214 """
215 Send an "away" presence from this contact to the user.
217 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
218 which concerns a specific conversation, ie a specific "chat window"
220 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
221 :param last_seen: For :xep:`0319`
222 """
223 try:
224 self._send(
225 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen)
226 )
227 except _NoChange:
228 pass
230 def extended_away(
231 self,
232 status: str | None = None,
233 last_seen: datetime | None = None,
234 ) -> None:
235 """
236 Send an "extended away" presence from this contact to the user.
238 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
239 which concerns a specific conversation, ie a specific "chat window"
241 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
242 :param last_seen: For :xep:`0319`
243 """
244 try:
245 self._send(
246 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen)
247 )
248 except _NoChange:
249 pass
251 def busy(
252 self,
253 status: str | None = None,
254 last_seen: datetime | None = None,
255 ) -> None:
256 """
257 Send a "busy" (ie, "dnd") presence from this contact to the user,
259 :param status: eg: "Trying to make sense of XEP-0100"
260 :param last_seen: For :xep:`0319`
261 """
262 try:
263 self._send(
264 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen)
265 )
266 except _NoChange:
267 pass
269 def offline(
270 self,
271 status: str | None = None,
272 last_seen: datetime | None = None,
273 ) -> None:
274 """
275 Send an "offline" presence from this contact to the user.
277 :param status: eg: "Trying to make sense of XEP-0100"
278 :param last_seen: For :xep:`0319`
279 """
280 try:
281 self._send(
282 self._make_presence(
283 pstatus=status, ptype="unavailable", last_seen=last_seen
284 )
285 )
286 except _NoChange:
287 pass
290def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]:
291 now = datetime.now(tz=UTC)
292 if now - last_seen < timedelta(days=7):
293 return f"Last seen {last_seen:%A %H:%M %p GMT}", True
294 else:
295 return f"Last seen {last_seen:%b %-d %Y}", False