Coverage for slidge / core / mixins / presence.py: 83%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import re 

2from asyncio import Task, sleep 

3from datetime import UTC, datetime, timedelta 

4from functools import partial 

5from typing import TYPE_CHECKING 

6 

7from slixmpp import Presence 

8from slixmpp.types import PresenceShows, PresenceTypes 

9from sqlalchemy.orm.exc import DetachedInstanceError 

10 

11from ...db.models import Contact, Participant 

12from ...util.types import AnySession, CachedPresence 

13from .base import BaseSender 

14from .db import DBMixin 

15 

16if TYPE_CHECKING: 

17 pass 

18 

19 

20class _NoChange(Exception): 

21 pass 

22 

23 

24_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

25_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task[None]]() 

26_ONE_WEEK_SECONDS = 3600 * 24 * 7 

27 

28 

29async def _update_last_seen_fallback(session: AnySession, contact_pk: int) -> None: 

30 await sleep(_ONE_WEEK_SECONDS) 

31 with session.xmpp.store.session() as orm: 

32 stored = orm.get(Contact, contact_pk) 

33 if stored is None: 

34 return 

35 contact = session.contacts.from_store(stored) 

36 contact.send_last_presence(force=True, no_cache_online=False) 

37 

38 

39def _clear_last_seen_task(contact_pk: int, _task: Task[None]) -> None: 

40 try: 

41 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk] 

42 except KeyError: 

43 pass 

44 

45 

46class PresenceMixin(BaseSender, DBMixin): 

47 _ONLY_SEND_PRESENCE_CHANGES = False 

48 

49 # this attribute actually only exists for contacts and not participants 

50 _updating_info: bool 

51 stored: Contact | Participant 

52 

53 def __init__(self, *a: object, **k: object) -> None: 

54 super().__init__(*a, **k) 

55 # this is only used when a presence is set during Contact.update_info(), 

56 # when the contact does not have a DB primary key yet, and is written 

57 # to DB at the end of update_info() 

58 self.cached_presence: CachedPresence | None = None 

59 

60 def __is_contact(self) -> bool: 

61 return isinstance(self.stored, Contact) 

62 

63 def __stored(self) -> Contact | None: 

64 if self.__is_contact(): 

65 assert isinstance(self.stored, Contact) 

66 return self.stored 

67 else: 

68 assert isinstance(self.stored, Participant) 

69 try: 

70 return self.stored.contact 

71 except DetachedInstanceError: 

72 with self.xmpp.store.session() as orm: 

73 orm.add(self.stored) 

74 if self.stored.contact is None: 

75 return None 

76 orm.refresh(self.stored.contact) 

77 orm.merge(self.stored) 

78 return self.stored.contact 

79 

80 @property 

81 def __contact_pk(self) -> int | None: 

82 stored = self.__stored() 

83 return None if stored is None else stored.id 

84 

85 def _get_last_presence(self) -> CachedPresence | None: 

86 stored = self.__stored() 

87 if stored is None or not stored.cached_presence: 

88 return None 

89 return CachedPresence( 

90 None if stored.last_seen is None else stored.last_seen.replace(tzinfo=UTC), 

91 stored.ptype, # type:ignore 

92 stored.pstatus, 

93 stored.pshow, # type:ignore 

94 ) 

95 

96 def _store_last_presence(self, new: CachedPresence) -> None: 

97 if self.__is_contact(): 

98 contact = self 

99 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment] 

100 return 

101 contact.update_stored_attribute( # type:ignore[attr-defined] 

102 cached_presence=True, 

103 **new._asdict(), 

104 ) 

105 

106 def _make_presence( 

107 self, 

108 *, 

109 last_seen: datetime | None = None, 

110 force: bool = False, 

111 bare: bool = False, 

112 ptype: PresenceTypes | None = None, 

113 pstatus: str | None = None, 

114 pshow: PresenceShows | None = None, 

115 ) -> Presence: 

116 if last_seen and last_seen.tzinfo is None: 

117 last_seen = last_seen.astimezone(UTC) 

118 

119 old = self._get_last_presence() 

120 

121 if ptype not in _FRIEND_REQUEST_PRESENCES: 

122 new = CachedPresence( 

123 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

124 ) 

125 if old != new: 

126 if hasattr(self, "muc") and ptype == "unavailable": 

127 stored = self.__stored() 

128 if stored is not None: 

129 stored.cached_presence = False 

130 self.commit(merge=True) 

131 else: 

132 self._store_last_presence(new) 

133 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

134 if old == new: 

135 self.session.log.debug("Presence is the same as cached") 

136 raise _NoChange 

137 self.session.log.debug( 

138 "Presence is not the same as cached: %s vs %s", old, new 

139 ) 

140 

141 p = self.xmpp.make_presence( 

142 pfrom=self.jid.bare if bare else self.jid, 

143 ptype=ptype, 

144 pshow=pshow, 

145 pstatus=pstatus, 

146 ) 

147 if last_seen: 

148 # it's ugly to check for the presence of this string, but a better fix is more work 

149 if not re.match( 

150 ".*Last seen .*", p["status"] 

151 ) and self.session.user.preferences.get("last_seen_fallback", True): 

152 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

153 if p["status"]: 

154 p["status"] = p["status"] + " -- " + last_seen_fallback 

155 else: 

156 p["status"] = last_seen_fallback 

157 pk = self.__contact_pk 

158 if recent and pk is not None: 

159 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

160 # but if lasts more than a week, this is not very informative, so 

161 # we need to force resend an updated presence status 

162 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk) 

163 if task is not None: 

164 task.cancel() 

165 task = self.session.create_task( 

166 _update_last_seen_fallback(self.session, pk) 

167 ) 

168 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task 

169 task.add_done_callback(partial(_clear_last_seen_task, pk)) 

170 p["idle"]["since"] = last_seen 

171 return p 

172 

173 def send_last_presence( 

174 self, force: bool = False, no_cache_online: bool = False 

175 ) -> None: 

176 if (cache := self._get_last_presence()) is None: 

177 if force: 

178 if no_cache_online: 

179 self.online() 

180 else: 

181 self.offline() 

182 return 

183 self._send( 

184 self._make_presence( 

185 last_seen=cache.last_seen, 

186 force=True, 

187 ptype=cache.ptype, 

188 pshow=cache.pshow, 

189 pstatus=cache.pstatus, 

190 ) 

191 ) 

192 

193 def online( 

194 self, 

195 status: str | None = None, 

196 last_seen: datetime | None = None, 

197 ) -> None: 

198 """ 

199 Send an "online" presence from this contact to the user. 

200 

201 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

202 :param last_seen: For :xep:`0319` 

203 """ 

204 try: 

205 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

206 except _NoChange: 

207 pass 

208 

209 def away( 

210 self, 

211 status: str | None = None, 

212 last_seen: datetime | None = None, 

213 ) -> None: 

214 """ 

215 Send an "away" presence from this contact to the user. 

216 

217 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

218 which concerns a specific conversation, ie a specific "chat window" 

219 

220 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

221 :param last_seen: For :xep:`0319` 

222 """ 

223 try: 

224 self._send( 

225 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

226 ) 

227 except _NoChange: 

228 pass 

229 

230 def extended_away( 

231 self, 

232 status: str | None = None, 

233 last_seen: datetime | None = None, 

234 ) -> None: 

235 """ 

236 Send an "extended away" presence from this contact to the user. 

237 

238 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

239 which concerns a specific conversation, ie a specific "chat window" 

240 

241 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

242 :param last_seen: For :xep:`0319` 

243 """ 

244 try: 

245 self._send( 

246 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

247 ) 

248 except _NoChange: 

249 pass 

250 

251 def busy( 

252 self, 

253 status: str | None = None, 

254 last_seen: datetime | None = None, 

255 ) -> None: 

256 """ 

257 Send a "busy" (ie, "dnd") presence from this contact to the user, 

258 

259 :param status: eg: "Trying to make sense of XEP-0100" 

260 :param last_seen: For :xep:`0319` 

261 """ 

262 try: 

263 self._send( 

264 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

265 ) 

266 except _NoChange: 

267 pass 

268 

269 def offline( 

270 self, 

271 status: str | None = None, 

272 last_seen: datetime | None = None, 

273 ) -> None: 

274 """ 

275 Send an "offline" presence from this contact to the user. 

276 

277 :param status: eg: "Trying to make sense of XEP-0100" 

278 :param last_seen: For :xep:`0319` 

279 """ 

280 try: 

281 self._send( 

282 self._make_presence( 

283 pstatus=status, ptype="unavailable", last_seen=last_seen 

284 ) 

285 ) 

286 except _NoChange: 

287 pass 

288 

289 

290def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]: 

291 now = datetime.now(tz=UTC) 

292 if now - last_seen < timedelta(days=7): 

293 return f"Last seen {last_seen:%A %H:%M %p GMT}", True 

294 else: 

295 return f"Last seen {last_seen:%b %-d %Y}", False