Coverage for slidge/core/mixins/presence.py: 83%

142 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import re 

2from asyncio import Task, sleep 

3from datetime import datetime, timedelta, timezone 

4from functools import partial 

5from typing import TYPE_CHECKING, Optional 

6 

7from slixmpp.types import PresenceShows, PresenceTypes 

8from sqlalchemy.exc import InvalidRequestError 

9from sqlalchemy.orm.exc import DetachedInstanceError 

10 

11from ...db.models import Contact, Participant 

12from ...util.types import CachedPresence 

13from .base import BaseSender 

14from .db import DBMixin 

15 

16if TYPE_CHECKING: 

17 from ..session import BaseSession 

18 

19 

20class _NoChange(Exception): 

21 pass 

22 

23 

24_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

25_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]() 

26_ONE_WEEK_SECONDS = 3600 * 24 * 7 

27 

28 

29async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None: 

30 await sleep(_ONE_WEEK_SECONDS) 

31 with session.xmpp.store.session() as orm: 

32 stored = orm.get(Contact, contact_pk) 

33 if stored is None: 

34 return 

35 contact = session.contacts.from_store(stored) 

36 contact.send_last_presence(force=True, no_cache_online=False) 

37 

38 

39def _clear_last_seen_task(contact_pk: int, _task) -> None: 

40 try: 

41 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk] 

42 except KeyError: 

43 pass 

44 

45 

46class PresenceMixin(BaseSender, DBMixin): 

47 _ONLY_SEND_PRESENCE_CHANGES = False 

48 

49 # this attribute actually only exists for contacts and not participants 

50 _updating_info: bool 

51 stored: Contact | Participant 

52 

53 def __init__(self, *a, **k) -> None: 

54 super().__init__(*a, **k) 

55 # this is only used when a presence is set during Contact.update_info(), 

56 # when the contact does not have a DB primary key yet, and is written 

57 # to DB at the end of update_info() 

58 self.cached_presence: Optional[CachedPresence] = None 

59 

60 def __is_contact(self) -> bool: 

61 return isinstance(self.stored, Contact) 

62 

63 def __stored(self) -> Contact | None: 

64 if self.__is_contact(): 

65 assert isinstance(self.stored, Contact) 

66 return self.stored 

67 else: 

68 assert isinstance(self.stored, Participant) 

69 try: 

70 return self.stored.contact 

71 except DetachedInstanceError: 

72 with self.xmpp.store.session() as orm: 

73 orm.add(self.stored) 

74 if self.stored.contact is None: 

75 return None 

76 orm.refresh(self.stored.contact) 

77 orm.merge(self.stored) 

78 return self.stored.contact 

79 

80 @property 

81 def __contact_pk(self) -> int | None: 

82 stored = self.__stored() 

83 return None if stored is None else stored.id 

84 

85 def _get_last_presence(self) -> Optional[CachedPresence]: 

86 stored = self.__stored() 

87 if stored is None or not stored.cached_presence: 

88 return None 

89 return CachedPresence( 

90 None 

91 if stored.last_seen is None 

92 else stored.last_seen.replace(tzinfo=timezone.utc), 

93 stored.ptype, # type:ignore 

94 stored.pstatus, 

95 stored.pshow, # type:ignore 

96 ) 

97 

98 def _store_last_presence(self, new: CachedPresence) -> None: 

99 if self.__is_contact(): 

100 contact = self 

101 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment] 

102 return 

103 contact.update_stored_attribute( # type:ignore[attr-defined] 

104 cached_presence=True, 

105 **new._asdict(), 

106 ) 

107 

108 def _make_presence( 

109 self, 

110 *, 

111 last_seen: Optional[datetime] = None, 

112 force: bool = False, 

113 bare: bool = False, 

114 ptype: Optional[PresenceTypes] = None, 

115 pstatus: Optional[str] = None, 

116 pshow: Optional[PresenceShows] = None, 

117 ): 

118 if last_seen and last_seen.tzinfo is None: 

119 last_seen = last_seen.astimezone(timezone.utc) 

120 

121 old = self._get_last_presence() 

122 

123 if ptype not in _FRIEND_REQUEST_PRESENCES: 

124 new = CachedPresence( 

125 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

126 ) 

127 if old != new: 

128 if hasattr(self, "muc") and ptype == "unavailable": 

129 stored = self.__stored() 

130 if stored is not None: 

131 stored.cached_presence = False 

132 self.commit(merge=True) 

133 else: 

134 self._store_last_presence(new) 

135 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

136 if old == new: 

137 self.session.log.debug("Presence is the same as cached") 

138 raise _NoChange 

139 self.session.log.debug( 

140 "Presence is not the same as cached: %s vs %s", old, new 

141 ) 

142 

143 p = self.xmpp.make_presence( 

144 pfrom=self.jid.bare if bare else self.jid, 

145 ptype=ptype, 

146 pshow=pshow, 

147 pstatus=pstatus, 

148 ) 

149 if last_seen: 

150 # it's ugly to check for the presence of this string, but a better fix is more work 

151 if not re.match( 

152 ".*Last seen .*", p["status"] 

153 ) and self.session.user.preferences.get("last_seen_fallback", True): 

154 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

155 if p["status"]: 

156 p["status"] = p["status"] + " -- " + last_seen_fallback 

157 else: 

158 p["status"] = last_seen_fallback 

159 pk = self.__contact_pk 

160 if recent and pk is not None: 

161 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

162 # but if lasts more than a week, this is not very informative, so 

163 # we need to force resend an updated presence status 

164 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk) 

165 if task is not None: 

166 task.cancel() 

167 task = self.session.create_task( 

168 _update_last_seen_fallback(self.session, pk) 

169 ) 

170 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task 

171 task.add_done_callback(partial(_clear_last_seen_task, pk)) 

172 p["idle"]["since"] = last_seen 

173 return p 

174 

175 def send_last_presence( 

176 self, force: bool = False, no_cache_online: bool = False 

177 ) -> None: 

178 if (cache := self._get_last_presence()) is None: 

179 if force: 

180 if no_cache_online: 

181 self.online() 

182 else: 

183 self.offline() 

184 return 

185 self._send( 

186 self._make_presence( 

187 last_seen=cache.last_seen, 

188 force=True, 

189 ptype=cache.ptype, 

190 pshow=cache.pshow, 

191 pstatus=cache.pstatus, 

192 ) 

193 ) 

194 

195 def online( 

196 self, 

197 status: Optional[str] = None, 

198 last_seen: Optional[datetime] = None, 

199 ) -> None: 

200 """ 

201 Send an "online" presence from this contact to the user. 

202 

203 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

204 :param last_seen: For :xep:`0319` 

205 """ 

206 try: 

207 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

208 except _NoChange: 

209 pass 

210 

211 def away( 

212 self, 

213 status: Optional[str] = None, 

214 last_seen: Optional[datetime] = None, 

215 ) -> None: 

216 """ 

217 Send an "away" presence from this contact to the user. 

218 

219 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

220 which concerns a specific conversation, ie a specific "chat window" 

221 

222 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

223 :param last_seen: For :xep:`0319` 

224 """ 

225 try: 

226 self._send( 

227 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

228 ) 

229 except _NoChange: 

230 pass 

231 

232 def extended_away( 

233 self, 

234 status: Optional[str] = None, 

235 last_seen: Optional[datetime] = None, 

236 ) -> None: 

237 """ 

238 Send an "extended away" presence from this contact to the user. 

239 

240 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

241 which concerns a specific conversation, ie a specific "chat window" 

242 

243 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

244 :param last_seen: For :xep:`0319` 

245 """ 

246 try: 

247 self._send( 

248 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

249 ) 

250 except _NoChange: 

251 pass 

252 

253 def busy( 

254 self, 

255 status: Optional[str] = None, 

256 last_seen: Optional[datetime] = None, 

257 ) -> None: 

258 """ 

259 Send a "busy" (ie, "dnd") presence from this contact to the user, 

260 

261 :param status: eg: "Trying to make sense of XEP-0100" 

262 :param last_seen: For :xep:`0319` 

263 """ 

264 try: 

265 self._send( 

266 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

267 ) 

268 except _NoChange: 

269 pass 

270 

271 def offline( 

272 self, 

273 status: Optional[str] = None, 

274 last_seen: Optional[datetime] = None, 

275 ) -> None: 

276 """ 

277 Send an "offline" presence from this contact to the user. 

278 

279 :param status: eg: "Trying to make sense of XEP-0100" 

280 :param last_seen: For :xep:`0319` 

281 """ 

282 try: 

283 self._send( 

284 self._make_presence( 

285 pstatus=status, ptype="unavailable", last_seen=last_seen 

286 ) 

287 ) 

288 except _NoChange: 

289 pass 

290 

291 

292def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]: 

293 now = datetime.now(tz=timezone.utc) 

294 if now - last_seen < timedelta(days=7): 

295 return f"Last seen {last_seen:%A %H:%M %p GMT}", True 

296 else: 

297 return f"Last seen {last_seen:%b %-d %Y}", False