Coverage for slidge/core/mixins/presence.py: 86%
97 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import re
2from asyncio import Task, sleep
3from datetime import datetime, timedelta, timezone
4from typing import Optional
6from slixmpp.types import PresenceShows, PresenceTypes
8from ...util.types import CachedPresence
9from .. import config
10from .base import BaseSender
13class _NoChange(Exception):
14 pass
17_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"}
20class PresenceMixin(BaseSender):
21 _ONLY_SEND_PRESENCE_CHANGES = False
22 contact_pk: Optional[int] = None
24 def __init__(self, *a, **k):
25 super().__init__(*a, **k)
26 # FIXME: this should not be an attribute of this mixin to allow garbage
27 # collection of instances
28 self.__update_last_seen_fallback_task: Optional[Task] = None
29 # this is only used when a presence is set during Contact.update_info(),
30 # when the contact does not have a DB primary key yet, and is written
31 # to DB at the end of update_info()
32 self.cached_presence: Optional[CachedPresence] = None
34 async def __update_last_seen_fallback(self):
35 await sleep(3600 * 7)
36 self.send_last_presence(force=True, no_cache_online=False)
38 def _get_last_presence(self) -> Optional[CachedPresence]:
39 if self.contact_pk is None:
40 return None
41 return self.xmpp.store.contacts.get_presence(self.contact_pk)
43 def _store_last_presence(self, new: CachedPresence):
44 if self.contact_pk is None:
45 self.cached_presence = new
46 return
47 self.xmpp.store.contacts.set_presence(self.contact_pk, new)
49 def _make_presence(
50 self,
51 *,
52 last_seen: Optional[datetime] = None,
53 force=False,
54 bare=False,
55 ptype: Optional[PresenceTypes] = None,
56 pstatus: Optional[str] = None,
57 pshow: Optional[PresenceShows] = None,
58 ):
59 if last_seen and last_seen.tzinfo is None:
60 last_seen = last_seen.astimezone(timezone.utc)
62 old = self._get_last_presence()
64 if ptype not in _FRIEND_REQUEST_PRESENCES:
65 new = CachedPresence(
66 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow
67 )
68 if old != new:
69 if hasattr(self, "muc") and ptype == "unavailable":
70 if self.contact_pk is not None:
71 self.xmpp.store.contacts.reset_presence(self.contact_pk)
72 else:
73 self._store_last_presence(new)
74 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES:
75 if old == new:
76 self.session.log.debug("Presence is the same as cached")
77 raise _NoChange
78 self.session.log.debug(
79 "Presence is not the same as cached: %s vs %s", old, new
80 )
82 p = self.xmpp.make_presence(
83 pfrom=self.jid.bare if bare else self.jid,
84 ptype=ptype,
85 pshow=pshow,
86 pstatus=pstatus,
87 )
88 if last_seen:
89 # it's ugly to check for the presence of this string, but a better fix is more work
90 if config.LAST_SEEN_FALLBACK and not re.match(
91 ".*Last seen .*", p["status"]
92 ):
93 last_seen_fallback, recent = get_last_seen_fallback(last_seen)
94 if p["status"]:
95 p["status"] = p["status"] + " -- " + last_seen_fallback
96 else:
97 p["status"] = last_seen_fallback
98 if recent:
99 # if less than a week, we use sth like 'Last seen: Monday, 8:05",
100 # but if lasts more than a week, this is not very informative, so
101 # we need to force resend an updated presence status
102 if self.__update_last_seen_fallback_task:
103 self.__update_last_seen_fallback_task.cancel()
104 self.__update_last_seen_fallback_task = self.xmpp.loop.create_task(
105 self.__update_last_seen_fallback()
106 )
107 p["idle"]["since"] = last_seen
108 return p
110 def send_last_presence(self, force=False, no_cache_online=False):
111 if (cache := self._get_last_presence()) is None:
112 if force:
113 if no_cache_online:
114 self.online()
115 else:
116 self.offline()
117 return
118 self._send(
119 self._make_presence(
120 last_seen=cache.last_seen,
121 force=True,
122 ptype=cache.ptype,
123 pshow=cache.pshow,
124 pstatus=cache.pstatus,
125 )
126 )
128 def online(
129 self,
130 status: Optional[str] = None,
131 last_seen: Optional[datetime] = None,
132 ):
133 """
134 Send an "online" presence from this contact to the user.
136 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears"
137 :param last_seen: For :xep:`0319`
138 """
139 try:
140 self._send(self._make_presence(pstatus=status, last_seen=last_seen))
141 except _NoChange:
142 pass
144 def away(
145 self,
146 status: Optional[str] = None,
147 last_seen: Optional[datetime] = None,
148 ):
149 """
150 Send an "away" presence from this contact to the user.
152 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
153 which concerns a specific conversation, ie a specific "chat window"
155 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
156 :param last_seen: For :xep:`0319`
157 """
158 try:
159 self._send(
160 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen)
161 )
162 except _NoChange:
163 pass
165 def extended_away(
166 self,
167 status: Optional[str] = None,
168 last_seen: Optional[datetime] = None,
169 ):
170 """
171 Send an "extended away" presence from this contact to the user.
173 This is a global status, as opposed to :meth:`.LegacyContact.inactive`
174 which concerns a specific conversation, ie a specific "chat window"
176 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism"
177 :param last_seen: For :xep:`0319`
178 """
179 try:
180 self._send(
181 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen)
182 )
183 except _NoChange:
184 pass
186 def busy(
187 self,
188 status: Optional[str] = None,
189 last_seen: Optional[datetime] = None,
190 ):
191 """
192 Send a "busy" (ie, "dnd") presence from this contact to the user,
194 :param status: eg: "Trying to make sense of XEP-0100"
195 :param last_seen: For :xep:`0319`
196 """
197 try:
198 self._send(
199 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen)
200 )
201 except _NoChange:
202 pass
204 def offline(
205 self,
206 status: Optional[str] = None,
207 last_seen: Optional[datetime] = None,
208 ):
209 """
210 Send an "offline" presence from this contact to the user.
212 :param status: eg: "Trying to make sense of XEP-0100"
213 :param last_seen: For :xep:`0319`
214 """
215 try:
216 self._send(
217 self._make_presence(
218 pstatus=status, ptype="unavailable", last_seen=last_seen
219 )
220 )
221 except _NoChange:
222 pass
225def get_last_seen_fallback(last_seen: datetime):
226 now = datetime.now(tz=timezone.utc)
227 if now - last_seen < timedelta(days=7):
228 return f"Last seen {last_seen:%A %H:%M GMT}", True
229 else:
230 return f"Last seen {last_seen:%b %-d %Y}", False