Coverage for slidge / core / mixins / presence.py: 91%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import contextlib
2import re
3from asyncio import Task, sleep
4from datetime import UTC, datetime, timedelta
5from functools import partial
6from typing import TYPE_CHECKING
8from slixmpp import Presence
9from slixmpp.types import PresenceShows, PresenceTypes
10from sqlalchemy.orm.exc import DetachedInstanceError
12from ...db.models import Contact, Participant
13from ...util.types import AnySession, CachedPresence
14from .base import BaseSender
15from .db import DBMixin
17if TYPE_CHECKING:
18 pass
21class _NoChange(Exception):
22 pass
25_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"}
26_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task[None]]()
27_ONE_WEEK_SECONDS = 3600 * 24 * 7
30async def _update_last_seen_fallback(session: AnySession, contact_pk: int) -> None:
31 await sleep(_ONE_WEEK_SECONDS)
32 with session.xmpp.store.session() as orm:
33 stored = orm.get(Contact, contact_pk)
34 if stored is None:
35 return
36 contact = session.contacts.from_store(stored)
37 contact.send_last_presence(force=True, no_cache_online=False)
40def _clear_last_seen_task(contact_pk: int, _task: Task[None]) -> None:
41 with contextlib.suppress(KeyError):
42 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk]
45class PresenceMixin(BaseSender, DBMixin):
46 _ONLY_SEND_PRESENCE_CHANGES = False
48 # this attribute actually only exists for contacts and not participants
49 _updating_info: bool
50 stored: Contact | Participant
52 def __init__(self, *a: object, **k: object) -> None:
53 super().__init__(*a, **k)
54 # this is only used when a presence is set during Contact.update_info(),
55 # when the contact does not have a DB primary key yet, and is written
56 # to DB at the end of update_info()
57 self.cached_presence: CachedPresence | None = None
59 def __is_contact(self) -> bool:
60 return isinstance(self.stored, Contact)
62 def __stored(self) -> Contact | None:
63 if self.__is_contact():
64 assert isinstance(self.stored, Contact)
65 return self.stored
66 else:
67 assert isinstance(self.stored, Participant)
68 try:
69 return self.stored.contact
70 except DetachedInstanceError:
71 with self.xmpp.store.session() as orm:
72 orm.add(self.stored)
73 if self.stored.contact is None:
74 return None
75 orm.refresh(self.stored.contact)
76 orm.merge(self.stored)
77 return self.stored.contact
79 @property
80 def __contact_pk(self) -> int | None:
81 stored = self.__stored()
82 return None if stored is None else stored.id
84 def _get_last_presence(self) -> CachedPresence | None:
85 stored = self.__stored()
86 if stored is None or not stored.cached_presence:
87 return None
88 return CachedPresence(
89 None if stored.last_seen is None else stored.last_seen.replace(tzinfo=UTC),
90 stored.ptype, # type:ignore
91 stored.pstatus,
92 stored.pshow, # type:ignore
93 )
95 def _store_last_presence(self, new: CachedPresence) -> None:
96 if self.__is_contact():
97 contact = self
98 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment]
99 return
100 contact.update_stored_attribute( # type:ignore[attr-defined]
101 cached_presence=True,
102 **new._asdict(),
103 )
105 def _make_presence(
106 self,
107 *,
108 last_seen: datetime | None = None,
109 force: bool = False,
110 bare: bool = False,
111 ptype: PresenceTypes | None = None,
112 pstatus: str | None = None,
113 pshow: PresenceShows | None = None,
114 ) -> Presence:
115 if last_seen and last_seen.tzinfo is None:
116 last_seen = last_seen.astimezone(UTC)
118 old = self._get_last_presence()
120 if ptype not in _FRIEND_REQUEST_PRESENCES:
121 new = CachedPresence(
122 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow
123 )
124 if old != new:
125 if hasattr(self, "muc") and ptype == "unavailable":
126 stored = self.__stored()
127 if stored is not None:
128 stored.cached_presence = False
129 self.commit()
130 else:
131 self._store_last_presence(new)
132 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES:
133 if old == new:
134 self.session.log.debug("Presence is the same as cached")
135 raise _NoChange
136 self.session.log.debug(
137 "Presence is not the same as cached: %s vs %s", old, new
138 )
140 p = self.xmpp.make_presence(
141 pfrom=self.jid.bare if bare else self.jid,
142 ptype=ptype,
143 pshow=pshow,
144 pstatus=pstatus,
145 )
146 if last_seen:
147 # it's ugly to check for the presence of this string, but a better fix is more work
148 if not re.match(
149 ".*Last seen .*", p["status"]
150 ) and self.session.user.preferences.get("last_seen_fallback", True):
151 last_seen_fallback, recent = get_last_seen_fallback(last_seen)
152 if p["status"]:
153 p["status"] = p["status"] + " -- " + last_seen_fallback
154 else:
155 p["status"] = last_seen_fallback
156 pk = self.__contact_pk
157 if recent and pk is not None:
158 # if less than a week, we use sth like 'Last seen: Monday, 8:05",
159 # but if lasts more than a week, this is not very informative, so
160 # we need to force resend an updated presence status
161 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk)
162 if task is not None:
163 task.cancel()
164 task = self.session.create_task(
165 _update_last_seen_fallback(self.session, pk)
166 )
167 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task
168 task.add_done_callback(partial(_clear_last_seen_task, pk))
169 p["idle"]["since"] = last_seen
170 return p
172 def send_last_presence(
173 self, force: bool = False, no_cache_online: bool = False
174 ) -> None:
175 if (cache := self._get_last_presence()) is None:
176 if force:
177 if no_cache_online:
178 self.online()
179 else:
180 self.offline()
181 return
182 self._send(
183 self._make_presence(
184 last_seen=cache.last_seen,
185 force=True,
186 ptype=cache.ptype,
187 pshow=cache.pshow,
188 pstatus=cache.pstatus,
189 )
190 )
192 def online(
193 self,
194 status: str | None = None,
195 last_seen: datetime | None = None,
196 ) -> None:
197 """
198 Send an "online" presence from this contact to the user.
200 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears"
201 :param last_seen: For :xep:`0319`
202 """
203 with contextlib.suppress(_NoChange):
204 self._send(self._make_presence(pstatus=status, last_seen=last_seen))
206 def away(
207 self,
208 status: str | None = None,
209 last_seen: datetime | None = None,
210 ) -> None:
211 """
212 Send an "away" presence from this contact to the user.
214 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
215 which concerns a specific conversation, ie a specific "chat window"
217 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
218 :param last_seen: For :xep:`0319`
219 """
220 with contextlib.suppress(_NoChange):
221 self._send(
222 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen)
223 )
225 def extended_away(
226 self,
227 status: str | None = None,
228 last_seen: datetime | None = None,
229 ) -> None:
230 """
231 Send an "extended away" presence from this contact to the user.
233 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
234 which concerns a specific conversation, ie a specific "chat window"
236 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
237 :param last_seen: For :xep:`0319`
238 """
239 with contextlib.suppress(_NoChange):
240 self._send(
241 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen)
242 )
244 def busy(
245 self,
246 status: str | None = None,
247 last_seen: datetime | None = None,
248 ) -> None:
249 """
250 Send a "busy" (ie, "dnd") presence from this contact to the user,
252 :param status: eg: "Trying to make sense of XEP-0100"
253 :param last_seen: For :xep:`0319`
254 """
255 with contextlib.suppress(_NoChange):
256 self._send(
257 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen)
258 )
260 def offline(
261 self,
262 status: str | None = None,
263 last_seen: datetime | None = None,
264 ) -> None:
265 """
266 Send an "offline" presence from this contact to the user.
268 :param status: eg: "Trying to make sense of XEP-0100"
269 :param last_seen: For :xep:`0319`
270 """
271 with contextlib.suppress(_NoChange):
272 self._send(
273 self._make_presence(
274 pstatus=status, ptype="unavailable", last_seen=last_seen
275 )
276 )
279def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]:
280 now = datetime.now(tz=UTC)
281 if now - last_seen < timedelta(days=7):
282 return f"Last seen {last_seen:%A %H:%M %p GMT}", True
283 else:
284 return f"Last seen {last_seen:%b %-d %Y}", False