Coverage for slidge / core / mixins / presence.py: 83%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1import re 

2from asyncio import Task, sleep 

3from datetime import UTC, datetime, timedelta 

4from functools import partial 

5from typing import TYPE_CHECKING 

6 

7from slixmpp.types import PresenceShows, PresenceTypes 

8from sqlalchemy.orm.exc import DetachedInstanceError 

9 

10from ...db.models import Contact, Participant 

11from ...util.types import CachedPresence 

12from .base import BaseSender 

13from .db import DBMixin 

14 

15if TYPE_CHECKING: 

16 from ..session import BaseSession 

17 

18 

19class _NoChange(Exception): 

20 pass 

21 

22 

23_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

24_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]() 

25_ONE_WEEK_SECONDS = 3600 * 24 * 7 

26 

27 

28async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None: 

29 await sleep(_ONE_WEEK_SECONDS) 

30 with session.xmpp.store.session() as orm: 

31 stored = orm.get(Contact, contact_pk) 

32 if stored is None: 

33 return 

34 contact = session.contacts.from_store(stored) 

35 contact.send_last_presence(force=True, no_cache_online=False) 

36 

37 

38def _clear_last_seen_task(contact_pk: int, _task) -> None: 

39 try: 

40 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk] 

41 except KeyError: 

42 pass 

43 

44 

45class PresenceMixin(BaseSender, DBMixin): 

46 _ONLY_SEND_PRESENCE_CHANGES = False 

47 

48 # this attribute actually only exists for contacts and not participants 

49 _updating_info: bool 

50 stored: Contact | Participant 

51 

52 def __init__(self, *a, **k) -> None: 

53 super().__init__(*a, **k) 

54 # this is only used when a presence is set during Contact.update_info(), 

55 # when the contact does not have a DB primary key yet, and is written 

56 # to DB at the end of update_info() 

57 self.cached_presence: CachedPresence | None = None 

58 

59 def __is_contact(self) -> bool: 

60 return isinstance(self.stored, Contact) 

61 

62 def __stored(self) -> Contact | None: 

63 if self.__is_contact(): 

64 assert isinstance(self.stored, Contact) 

65 return self.stored 

66 else: 

67 assert isinstance(self.stored, Participant) 

68 try: 

69 return self.stored.contact 

70 except DetachedInstanceError: 

71 with self.xmpp.store.session() as orm: 

72 orm.add(self.stored) 

73 if self.stored.contact is None: 

74 return None 

75 orm.refresh(self.stored.contact) 

76 orm.merge(self.stored) 

77 return self.stored.contact 

78 

79 @property 

80 def __contact_pk(self) -> int | None: 

81 stored = self.__stored() 

82 return None if stored is None else stored.id 

83 

84 def _get_last_presence(self) -> CachedPresence | None: 

85 stored = self.__stored() 

86 if stored is None or not stored.cached_presence: 

87 return None 

88 return CachedPresence( 

89 None if stored.last_seen is None else stored.last_seen.replace(tzinfo=UTC), 

90 stored.ptype, # type:ignore 

91 stored.pstatus, 

92 stored.pshow, # type:ignore 

93 ) 

94 

95 def _store_last_presence(self, new: CachedPresence) -> None: 

96 if self.__is_contact(): 

97 contact = self 

98 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment] 

99 return 

100 contact.update_stored_attribute( # type:ignore[attr-defined] 

101 cached_presence=True, 

102 **new._asdict(), 

103 ) 

104 

105 def _make_presence( 

106 self, 

107 *, 

108 last_seen: datetime | None = None, 

109 force: bool = False, 

110 bare: bool = False, 

111 ptype: PresenceTypes | None = None, 

112 pstatus: str | None = None, 

113 pshow: PresenceShows | None = None, 

114 ): 

115 if last_seen and last_seen.tzinfo is None: 

116 last_seen = last_seen.astimezone(UTC) 

117 

118 old = self._get_last_presence() 

119 

120 if ptype not in _FRIEND_REQUEST_PRESENCES: 

121 new = CachedPresence( 

122 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

123 ) 

124 if old != new: 

125 if hasattr(self, "muc") and ptype == "unavailable": 

126 stored = self.__stored() 

127 if stored is not None: 

128 stored.cached_presence = False 

129 self.commit(merge=True) 

130 else: 

131 self._store_last_presence(new) 

132 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

133 if old == new: 

134 self.session.log.debug("Presence is the same as cached") 

135 raise _NoChange 

136 self.session.log.debug( 

137 "Presence is not the same as cached: %s vs %s", old, new 

138 ) 

139 

140 p = self.xmpp.make_presence( 

141 pfrom=self.jid.bare if bare else self.jid, 

142 ptype=ptype, 

143 pshow=pshow, 

144 pstatus=pstatus, 

145 ) 

146 if last_seen: 

147 # it's ugly to check for the presence of this string, but a better fix is more work 

148 if not re.match( 

149 ".*Last seen .*", p["status"] 

150 ) and self.session.user.preferences.get("last_seen_fallback", True): 

151 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

152 if p["status"]: 

153 p["status"] = p["status"] + " -- " + last_seen_fallback 

154 else: 

155 p["status"] = last_seen_fallback 

156 pk = self.__contact_pk 

157 if recent and pk is not None: 

158 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

159 # but if lasts more than a week, this is not very informative, so 

160 # we need to force resend an updated presence status 

161 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk) 

162 if task is not None: 

163 task.cancel() 

164 task = self.session.create_task( 

165 _update_last_seen_fallback(self.session, pk) 

166 ) 

167 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task 

168 task.add_done_callback(partial(_clear_last_seen_task, pk)) 

169 p["idle"]["since"] = last_seen 

170 return p 

171 

172 def send_last_presence( 

173 self, force: bool = False, no_cache_online: bool = False 

174 ) -> None: 

175 if (cache := self._get_last_presence()) is None: 

176 if force: 

177 if no_cache_online: 

178 self.online() 

179 else: 

180 self.offline() 

181 return 

182 self._send( 

183 self._make_presence( 

184 last_seen=cache.last_seen, 

185 force=True, 

186 ptype=cache.ptype, 

187 pshow=cache.pshow, 

188 pstatus=cache.pstatus, 

189 ) 

190 ) 

191 

192 def online( 

193 self, 

194 status: str | None = None, 

195 last_seen: datetime | None = None, 

196 ) -> None: 

197 """ 

198 Send an "online" presence from this contact to the user. 

199 

200 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

201 :param last_seen: For :xep:`0319` 

202 """ 

203 try: 

204 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

205 except _NoChange: 

206 pass 

207 

208 def away( 

209 self, 

210 status: str | None = None, 

211 last_seen: datetime | None = None, 

212 ) -> None: 

213 """ 

214 Send an "away" presence from this contact to the user. 

215 

216 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

217 which concerns a specific conversation, ie a specific "chat window" 

218 

219 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

220 :param last_seen: For :xep:`0319` 

221 """ 

222 try: 

223 self._send( 

224 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

225 ) 

226 except _NoChange: 

227 pass 

228 

229 def extended_away( 

230 self, 

231 status: str | None = None, 

232 last_seen: datetime | None = None, 

233 ) -> None: 

234 """ 

235 Send an "extended away" presence from this contact to the user. 

236 

237 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

238 which concerns a specific conversation, ie a specific "chat window" 

239 

240 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

241 :param last_seen: For :xep:`0319` 

242 """ 

243 try: 

244 self._send( 

245 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

246 ) 

247 except _NoChange: 

248 pass 

249 

250 def busy( 

251 self, 

252 status: str | None = None, 

253 last_seen: datetime | None = None, 

254 ) -> None: 

255 """ 

256 Send a "busy" (ie, "dnd") presence from this contact to the user, 

257 

258 :param status: eg: "Trying to make sense of XEP-0100" 

259 :param last_seen: For :xep:`0319` 

260 """ 

261 try: 

262 self._send( 

263 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

264 ) 

265 except _NoChange: 

266 pass 

267 

268 def offline( 

269 self, 

270 status: str | None = None, 

271 last_seen: datetime | None = None, 

272 ) -> None: 

273 """ 

274 Send an "offline" presence from this contact to the user. 

275 

276 :param status: eg: "Trying to make sense of XEP-0100" 

277 :param last_seen: For :xep:`0319` 

278 """ 

279 try: 

280 self._send( 

281 self._make_presence( 

282 pstatus=status, ptype="unavailable", last_seen=last_seen 

283 ) 

284 ) 

285 except _NoChange: 

286 pass 

287 

288 

289def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]: 

290 now = datetime.now(tz=UTC) 

291 if now - last_seen < timedelta(days=7): 

292 return f"Last seen {last_seen:%A %H:%M %p GMT}", True 

293 else: 

294 return f"Last seen {last_seen:%b %-d %Y}", False