Coverage for slidge/core/mixins/presence.py: 85%
136 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import re
2from asyncio import Task, sleep
3from datetime import datetime, timedelta, timezone
4from functools import partial
5from typing import TYPE_CHECKING, Optional
7from slixmpp.types import PresenceShows, PresenceTypes
8from sqlalchemy.exc import InvalidRequestError
9from sqlalchemy.orm.exc import DetachedInstanceError
11from ...db.models import Contact, Participant
12from ...util.types import CachedPresence
13from .base import BaseSender
14from .db import DBMixin
16if TYPE_CHECKING:
17 from ..session import BaseSession
20class _NoChange(Exception):
21 pass
24_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"}
25_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]()
26_ONE_WEEK_SECONDS = 3600 * 24 * 7
29async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None:
30 await sleep(_ONE_WEEK_SECONDS)
31 with session.xmpp.store.session() as orm:
32 stored = orm.get(Contact, contact_pk)
33 if stored is None:
34 return
35 contact = session.contacts.from_store(stored)
36 contact.send_last_presence(force=True, no_cache_online=False)
39def _clear_last_seen_task(contact_pk: int, _task) -> None:
40 try:
41 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk]
42 except KeyError:
43 pass
46class PresenceMixin(BaseSender, DBMixin):
47 _ONLY_SEND_PRESENCE_CHANGES = False
49 stored: Contact | Participant
51 def __init__(self, *a, **k) -> None:
52 super().__init__(*a, **k)
53 # this is only used when a presence is set during Contact.update_info(),
54 # when the contact does not have a DB primary key yet, and is written
55 # to DB at the end of update_info()
56 self.cached_presence: Optional[CachedPresence] = None
58 def __stored(self) -> Contact | None:
59 if isinstance(self.stored, Contact):
60 return self.stored
61 else:
62 try:
63 return self.stored.contact
64 except DetachedInstanceError:
65 self.merge()
66 return self.stored.contact
68 @property
69 def __contact_pk(self) -> int | None:
70 stored = self.__stored()
71 return None if stored is None else stored.id
73 def _get_last_presence(self) -> Optional[CachedPresence]:
74 stored = self.__stored()
75 if stored is None or not stored.cached_presence:
76 return None
77 return CachedPresence(
78 None
79 if stored.last_seen is None
80 else stored.last_seen.replace(tzinfo=timezone.utc),
81 stored.ptype, # type:ignore
82 stored.pstatus,
83 stored.pshow, # type:ignore
84 )
86 def _store_last_presence(self, new: CachedPresence) -> None:
87 stored = self.__stored()
88 if stored is not None:
89 stored.cached_presence = True
90 for k, v in new._asdict().items():
91 setattr(stored, k, v)
92 try:
93 self.commit()
94 except InvalidRequestError:
95 self.commit(merge=True)
97 def _make_presence(
98 self,
99 *,
100 last_seen: Optional[datetime] = None,
101 force: bool = False,
102 bare: bool = False,
103 ptype: Optional[PresenceTypes] = None,
104 pstatus: Optional[str] = None,
105 pshow: Optional[PresenceShows] = None,
106 ):
107 if last_seen and last_seen.tzinfo is None:
108 last_seen = last_seen.astimezone(timezone.utc)
110 old = self._get_last_presence()
112 if ptype not in _FRIEND_REQUEST_PRESENCES:
113 new = CachedPresence(
114 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow
115 )
116 if old != new:
117 if hasattr(self, "muc") and ptype == "unavailable":
118 stored = self.__stored()
119 if stored is not None:
120 stored.cached_presence = False
121 self.commit(merge=True)
122 else:
123 self._store_last_presence(new)
124 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES:
125 if old == new:
126 self.session.log.debug("Presence is the same as cached")
127 raise _NoChange
128 self.session.log.debug(
129 "Presence is not the same as cached: %s vs %s", old, new
130 )
132 p = self.xmpp.make_presence(
133 pfrom=self.jid.bare if bare else self.jid,
134 ptype=ptype,
135 pshow=pshow,
136 pstatus=pstatus,
137 )
138 if last_seen:
139 # it's ugly to check for the presence of this string, but a better fix is more work
140 if not re.match(
141 ".*Last seen .*", p["status"]
142 ) and self.session.user.preferences.get("last_seen_fallback", True):
143 last_seen_fallback, recent = get_last_seen_fallback(last_seen)
144 if p["status"]:
145 p["status"] = p["status"] + " -- " + last_seen_fallback
146 else:
147 p["status"] = last_seen_fallback
148 pk = self.__contact_pk
149 if recent and pk is not None:
150 # if less than a week, we use sth like 'Last seen: Monday, 8:05",
151 # but if lasts more than a week, this is not very informative, so
152 # we need to force resend an updated presence status
153 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk)
154 if task is not None:
155 task.cancel()
156 task = self.session.create_task(
157 _update_last_seen_fallback(self.session, pk)
158 )
159 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task
160 task.add_done_callback(partial(_clear_last_seen_task, pk))
161 p["idle"]["since"] = last_seen
162 return p
164 def send_last_presence(
165 self, force: bool = False, no_cache_online: bool = False
166 ) -> None:
167 if (cache := self._get_last_presence()) is None:
168 if force:
169 if no_cache_online:
170 self.online()
171 else:
172 self.offline()
173 return
174 self._send(
175 self._make_presence(
176 last_seen=cache.last_seen,
177 force=True,
178 ptype=cache.ptype,
179 pshow=cache.pshow,
180 pstatus=cache.pstatus,
181 )
182 )
184 def online(
185 self,
186 status: Optional[str] = None,
187 last_seen: Optional[datetime] = None,
188 ) -> None:
189 """
190 Send an "online" presence from this contact to the user.
192 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears"
193 :param last_seen: For :xep:`0319`
194 """
195 try:
196 self._send(self._make_presence(pstatus=status, last_seen=last_seen))
197 except _NoChange:
198 pass
200 def away(
201 self,
202 status: Optional[str] = None,
203 last_seen: Optional[datetime] = None,
204 ) -> None:
205 """
206 Send an "away" presence from this contact to the user.
208 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
209 which concerns a specific conversation, ie a specific "chat window"
211 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
212 :param last_seen: For :xep:`0319`
213 """
214 try:
215 self._send(
216 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen)
217 )
218 except _NoChange:
219 pass
221 def extended_away(
222 self,
223 status: Optional[str] = None,
224 last_seen: Optional[datetime] = None,
225 ) -> None:
226 """
227 Send an "extended away" presence from this contact to the user.
229 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
230 which concerns a specific conversation, ie a specific "chat window"
232 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
233 :param last_seen: For :xep:`0319`
234 """
235 try:
236 self._send(
237 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen)
238 )
239 except _NoChange:
240 pass
242 def busy(
243 self,
244 status: Optional[str] = None,
245 last_seen: Optional[datetime] = None,
246 ) -> None:
247 """
248 Send a "busy" (ie, "dnd") presence from this contact to the user,
250 :param status: eg: "Trying to make sense of XEP-0100"
251 :param last_seen: For :xep:`0319`
252 """
253 try:
254 self._send(
255 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen)
256 )
257 except _NoChange:
258 pass
260 def offline(
261 self,
262 status: Optional[str] = None,
263 last_seen: Optional[datetime] = None,
264 ) -> None:
265 """
266 Send an "offline" presence from this contact to the user.
268 :param status: eg: "Trying to make sense of XEP-0100"
269 :param last_seen: For :xep:`0319`
270 """
271 try:
272 self._send(
273 self._make_presence(
274 pstatus=status, ptype="unavailable", last_seen=last_seen
275 )
276 )
277 except _NoChange:
278 pass
281def get_last_seen_fallback(last_seen: datetime):
282 now = datetime.now(tz=timezone.utc)
283 if now - last_seen < timedelta(days=7):
284 return f"Last seen {last_seen:%A %H:%M GMT}", True
285 else:
286 return f"Last seen {last_seen:%b %-d %Y}", False