Coverage for slidge / core / mixins / presence.py: 83%
141 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import re
2from asyncio import Task, sleep
3from datetime import datetime, timedelta, timezone
4from functools import partial
5from typing import TYPE_CHECKING, Optional
7from slixmpp.types import PresenceShows, PresenceTypes
8from sqlalchemy.orm.exc import DetachedInstanceError
10from ...db.models import Contact, Participant
11from ...util.types import CachedPresence
12from .base import BaseSender
13from .db import DBMixin
15if TYPE_CHECKING:
16 from ..session import BaseSession
19class _NoChange(Exception):
20 pass
23_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"}
24_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]()
25_ONE_WEEK_SECONDS = 3600 * 24 * 7
28async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None:
29 await sleep(_ONE_WEEK_SECONDS)
30 with session.xmpp.store.session() as orm:
31 stored = orm.get(Contact, contact_pk)
32 if stored is None:
33 return
34 contact = session.contacts.from_store(stored)
35 contact.send_last_presence(force=True, no_cache_online=False)
38def _clear_last_seen_task(contact_pk: int, _task) -> None:
39 try:
40 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk]
41 except KeyError:
42 pass
45class PresenceMixin(BaseSender, DBMixin):
46 _ONLY_SEND_PRESENCE_CHANGES = False
48 # this attribute actually only exists for contacts and not participants
49 _updating_info: bool
50 stored: Contact | Participant
52 def __init__(self, *a, **k) -> None:
53 super().__init__(*a, **k)
54 # this is only used when a presence is set during Contact.update_info(),
55 # when the contact does not have a DB primary key yet, and is written
56 # to DB at the end of update_info()
57 self.cached_presence: Optional[CachedPresence] = None
59 def __is_contact(self) -> bool:
60 return isinstance(self.stored, Contact)
62 def __stored(self) -> Contact | None:
63 if self.__is_contact():
64 assert isinstance(self.stored, Contact)
65 return self.stored
66 else:
67 assert isinstance(self.stored, Participant)
68 try:
69 return self.stored.contact
70 except DetachedInstanceError:
71 with self.xmpp.store.session() as orm:
72 orm.add(self.stored)
73 if self.stored.contact is None:
74 return None
75 orm.refresh(self.stored.contact)
76 orm.merge(self.stored)
77 return self.stored.contact
79 @property
80 def __contact_pk(self) -> int | None:
81 stored = self.__stored()
82 return None if stored is None else stored.id
84 def _get_last_presence(self) -> Optional[CachedPresence]:
85 stored = self.__stored()
86 if stored is None or not stored.cached_presence:
87 return None
88 return CachedPresence(
89 None
90 if stored.last_seen is None
91 else stored.last_seen.replace(tzinfo=timezone.utc),
92 stored.ptype, # type:ignore
93 stored.pstatus,
94 stored.pshow, # type:ignore
95 )
97 def _store_last_presence(self, new: CachedPresence) -> None:
98 if self.__is_contact():
99 contact = self
100 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment]
101 return
102 contact.update_stored_attribute( # type:ignore[attr-defined]
103 cached_presence=True,
104 **new._asdict(),
105 )
107 def _make_presence(
108 self,
109 *,
110 last_seen: Optional[datetime] = None,
111 force: bool = False,
112 bare: bool = False,
113 ptype: Optional[PresenceTypes] = None,
114 pstatus: Optional[str] = None,
115 pshow: Optional[PresenceShows] = None,
116 ):
117 if last_seen and last_seen.tzinfo is None:
118 last_seen = last_seen.astimezone(timezone.utc)
120 old = self._get_last_presence()
122 if ptype not in _FRIEND_REQUEST_PRESENCES:
123 new = CachedPresence(
124 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow
125 )
126 if old != new:
127 if hasattr(self, "muc") and ptype == "unavailable":
128 stored = self.__stored()
129 if stored is not None:
130 stored.cached_presence = False
131 self.commit(merge=True)
132 else:
133 self._store_last_presence(new)
134 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES:
135 if old == new:
136 self.session.log.debug("Presence is the same as cached")
137 raise _NoChange
138 self.session.log.debug(
139 "Presence is not the same as cached: %s vs %s", old, new
140 )
142 p = self.xmpp.make_presence(
143 pfrom=self.jid.bare if bare else self.jid,
144 ptype=ptype,
145 pshow=pshow,
146 pstatus=pstatus,
147 )
148 if last_seen:
149 # it's ugly to check for the presence of this string, but a better fix is more work
150 if not re.match(
151 ".*Last seen .*", p["status"]
152 ) and self.session.user.preferences.get("last_seen_fallback", True):
153 last_seen_fallback, recent = get_last_seen_fallback(last_seen)
154 if p["status"]:
155 p["status"] = p["status"] + " -- " + last_seen_fallback
156 else:
157 p["status"] = last_seen_fallback
158 pk = self.__contact_pk
159 if recent and pk is not None:
160 # if less than a week, we use sth like 'Last seen: Monday, 8:05",
161 # but if lasts more than a week, this is not very informative, so
162 # we need to force resend an updated presence status
163 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk)
164 if task is not None:
165 task.cancel()
166 task = self.session.create_task(
167 _update_last_seen_fallback(self.session, pk)
168 )
169 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task
170 task.add_done_callback(partial(_clear_last_seen_task, pk))
171 p["idle"]["since"] = last_seen
172 return p
174 def send_last_presence(
175 self, force: bool = False, no_cache_online: bool = False
176 ) -> None:
177 if (cache := self._get_last_presence()) is None:
178 if force:
179 if no_cache_online:
180 self.online()
181 else:
182 self.offline()
183 return
184 self._send(
185 self._make_presence(
186 last_seen=cache.last_seen,
187 force=True,
188 ptype=cache.ptype,
189 pshow=cache.pshow,
190 pstatus=cache.pstatus,
191 )
192 )
194 def online(
195 self,
196 status: Optional[str] = None,
197 last_seen: Optional[datetime] = None,
198 ) -> None:
199 """
200 Send an "online" presence from this contact to the user.
202 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears"
203 :param last_seen: For :xep:`0319`
204 """
205 try:
206 self._send(self._make_presence(pstatus=status, last_seen=last_seen))
207 except _NoChange:
208 pass
210 def away(
211 self,
212 status: Optional[str] = None,
213 last_seen: Optional[datetime] = None,
214 ) -> None:
215 """
216 Send an "away" presence from this contact to the user.
218 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
219 which concerns a specific conversation, ie a specific "chat window"
221 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
222 :param last_seen: For :xep:`0319`
223 """
224 try:
225 self._send(
226 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen)
227 )
228 except _NoChange:
229 pass
231 def extended_away(
232 self,
233 status: Optional[str] = None,
234 last_seen: Optional[datetime] = None,
235 ) -> None:
236 """
237 Send an "extended away" presence from this contact to the user.
239 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
240 which concerns a specific conversation, ie a specific "chat window"
242 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
243 :param last_seen: For :xep:`0319`
244 """
245 try:
246 self._send(
247 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen)
248 )
249 except _NoChange:
250 pass
252 def busy(
253 self,
254 status: Optional[str] = None,
255 last_seen: Optional[datetime] = None,
256 ) -> None:
257 """
258 Send a "busy" (ie, "dnd") presence from this contact to the user,
260 :param status: eg: "Trying to make sense of XEP-0100"
261 :param last_seen: For :xep:`0319`
262 """
263 try:
264 self._send(
265 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen)
266 )
267 except _NoChange:
268 pass
270 def offline(
271 self,
272 status: Optional[str] = None,
273 last_seen: Optional[datetime] = None,
274 ) -> None:
275 """
276 Send an "offline" presence from this contact to the user.
278 :param status: eg: "Trying to make sense of XEP-0100"
279 :param last_seen: For :xep:`0319`
280 """
281 try:
282 self._send(
283 self._make_presence(
284 pstatus=status, ptype="unavailable", last_seen=last_seen
285 )
286 )
287 except _NoChange:
288 pass
291def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]:
292 now = datetime.now(tz=timezone.utc)
293 if now - last_seen < timedelta(days=7):
294 return f"Last seen {last_seen:%A %H:%M %p GMT}", True
295 else:
296 return f"Last seen {last_seen:%b %-d %Y}", False