Coverage for slidge / core / mixins / presence.py: 83%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import re 

2from asyncio import Task, sleep 

3from datetime import datetime, timedelta, timezone 

4from functools import partial 

5from typing import TYPE_CHECKING, Optional 

6 

7from slixmpp.types import PresenceShows, PresenceTypes 

8from sqlalchemy.orm.exc import DetachedInstanceError 

9 

10from ...db.models import Contact, Participant 

11from ...util.types import CachedPresence 

12from .base import BaseSender 

13from .db import DBMixin 

14 

15if TYPE_CHECKING: 

16 from ..session import BaseSession 

17 

18 

19class _NoChange(Exception): 

20 pass 

21 

22 

23_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

24_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]() 

25_ONE_WEEK_SECONDS = 3600 * 24 * 7 

26 

27 

28async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None: 

29 await sleep(_ONE_WEEK_SECONDS) 

30 with session.xmpp.store.session() as orm: 

31 stored = orm.get(Contact, contact_pk) 

32 if stored is None: 

33 return 

34 contact = session.contacts.from_store(stored) 

35 contact.send_last_presence(force=True, no_cache_online=False) 

36 

37 

38def _clear_last_seen_task(contact_pk: int, _task) -> None: 

39 try: 

40 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk] 

41 except KeyError: 

42 pass 

43 

44 

45class PresenceMixin(BaseSender, DBMixin): 

46 _ONLY_SEND_PRESENCE_CHANGES = False 

47 

48 # this attribute actually only exists for contacts and not participants 

49 _updating_info: bool 

50 stored: Contact | Participant 

51 

52 def __init__(self, *a, **k) -> None: 

53 super().__init__(*a, **k) 

54 # this is only used when a presence is set during Contact.update_info(), 

55 # when the contact does not have a DB primary key yet, and is written 

56 # to DB at the end of update_info() 

57 self.cached_presence: Optional[CachedPresence] = None 

58 

59 def __is_contact(self) -> bool: 

60 return isinstance(self.stored, Contact) 

61 

62 def __stored(self) -> Contact | None: 

63 if self.__is_contact(): 

64 assert isinstance(self.stored, Contact) 

65 return self.stored 

66 else: 

67 assert isinstance(self.stored, Participant) 

68 try: 

69 return self.stored.contact 

70 except DetachedInstanceError: 

71 with self.xmpp.store.session() as orm: 

72 orm.add(self.stored) 

73 if self.stored.contact is None: 

74 return None 

75 orm.refresh(self.stored.contact) 

76 orm.merge(self.stored) 

77 return self.stored.contact 

78 

79 @property 

80 def __contact_pk(self) -> int | None: 

81 stored = self.__stored() 

82 return None if stored is None else stored.id 

83 

84 def _get_last_presence(self) -> Optional[CachedPresence]: 

85 stored = self.__stored() 

86 if stored is None or not stored.cached_presence: 

87 return None 

88 return CachedPresence( 

89 None 

90 if stored.last_seen is None 

91 else stored.last_seen.replace(tzinfo=timezone.utc), 

92 stored.ptype, # type:ignore 

93 stored.pstatus, 

94 stored.pshow, # type:ignore 

95 ) 

96 

97 def _store_last_presence(self, new: CachedPresence) -> None: 

98 if self.__is_contact(): 

99 contact = self 

100 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment] 

101 return 

102 contact.update_stored_attribute( # type:ignore[attr-defined] 

103 cached_presence=True, 

104 **new._asdict(), 

105 ) 

106 

107 def _make_presence( 

108 self, 

109 *, 

110 last_seen: Optional[datetime] = None, 

111 force: bool = False, 

112 bare: bool = False, 

113 ptype: Optional[PresenceTypes] = None, 

114 pstatus: Optional[str] = None, 

115 pshow: Optional[PresenceShows] = None, 

116 ): 

117 if last_seen and last_seen.tzinfo is None: 

118 last_seen = last_seen.astimezone(timezone.utc) 

119 

120 old = self._get_last_presence() 

121 

122 if ptype not in _FRIEND_REQUEST_PRESENCES: 

123 new = CachedPresence( 

124 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

125 ) 

126 if old != new: 

127 if hasattr(self, "muc") and ptype == "unavailable": 

128 stored = self.__stored() 

129 if stored is not None: 

130 stored.cached_presence = False 

131 self.commit(merge=True) 

132 else: 

133 self._store_last_presence(new) 

134 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

135 if old == new: 

136 self.session.log.debug("Presence is the same as cached") 

137 raise _NoChange 

138 self.session.log.debug( 

139 "Presence is not the same as cached: %s vs %s", old, new 

140 ) 

141 

142 p = self.xmpp.make_presence( 

143 pfrom=self.jid.bare if bare else self.jid, 

144 ptype=ptype, 

145 pshow=pshow, 

146 pstatus=pstatus, 

147 ) 

148 if last_seen: 

149 # it's ugly to check for the presence of this string, but a better fix is more work 

150 if not re.match( 

151 ".*Last seen .*", p["status"] 

152 ) and self.session.user.preferences.get("last_seen_fallback", True): 

153 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

154 if p["status"]: 

155 p["status"] = p["status"] + " -- " + last_seen_fallback 

156 else: 

157 p["status"] = last_seen_fallback 

158 pk = self.__contact_pk 

159 if recent and pk is not None: 

160 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

161 # but if lasts more than a week, this is not very informative, so 

162 # we need to force resend an updated presence status 

163 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk) 

164 if task is not None: 

165 task.cancel() 

166 task = self.session.create_task( 

167 _update_last_seen_fallback(self.session, pk) 

168 ) 

169 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task 

170 task.add_done_callback(partial(_clear_last_seen_task, pk)) 

171 p["idle"]["since"] = last_seen 

172 return p 

173 

174 def send_last_presence( 

175 self, force: bool = False, no_cache_online: bool = False 

176 ) -> None: 

177 if (cache := self._get_last_presence()) is None: 

178 if force: 

179 if no_cache_online: 

180 self.online() 

181 else: 

182 self.offline() 

183 return 

184 self._send( 

185 self._make_presence( 

186 last_seen=cache.last_seen, 

187 force=True, 

188 ptype=cache.ptype, 

189 pshow=cache.pshow, 

190 pstatus=cache.pstatus, 

191 ) 

192 ) 

193 

194 def online( 

195 self, 

196 status: Optional[str] = None, 

197 last_seen: Optional[datetime] = None, 

198 ) -> None: 

199 """ 

200 Send an "online" presence from this contact to the user. 

201 

202 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

203 :param last_seen: For :xep:`0319` 

204 """ 

205 try: 

206 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

207 except _NoChange: 

208 pass 

209 

210 def away( 

211 self, 

212 status: Optional[str] = None, 

213 last_seen: Optional[datetime] = None, 

214 ) -> None: 

215 """ 

216 Send an "away" presence from this contact to the user. 

217 

218 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

219 which concerns a specific conversation, ie a specific "chat window" 

220 

221 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

222 :param last_seen: For :xep:`0319` 

223 """ 

224 try: 

225 self._send( 

226 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

227 ) 

228 except _NoChange: 

229 pass 

230 

231 def extended_away( 

232 self, 

233 status: Optional[str] = None, 

234 last_seen: Optional[datetime] = None, 

235 ) -> None: 

236 """ 

237 Send an "extended away" presence from this contact to the user. 

238 

239 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

240 which concerns a specific conversation, ie a specific "chat window" 

241 

242 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

243 :param last_seen: For :xep:`0319` 

244 """ 

245 try: 

246 self._send( 

247 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

248 ) 

249 except _NoChange: 

250 pass 

251 

252 def busy( 

253 self, 

254 status: Optional[str] = None, 

255 last_seen: Optional[datetime] = None, 

256 ) -> None: 

257 """ 

258 Send a "busy" (ie, "dnd") presence from this contact to the user, 

259 

260 :param status: eg: "Trying to make sense of XEP-0100" 

261 :param last_seen: For :xep:`0319` 

262 """ 

263 try: 

264 self._send( 

265 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

266 ) 

267 except _NoChange: 

268 pass 

269 

270 def offline( 

271 self, 

272 status: Optional[str] = None, 

273 last_seen: Optional[datetime] = None, 

274 ) -> None: 

275 """ 

276 Send an "offline" presence from this contact to the user. 

277 

278 :param status: eg: "Trying to make sense of XEP-0100" 

279 :param last_seen: For :xep:`0319` 

280 """ 

281 try: 

282 self._send( 

283 self._make_presence( 

284 pstatus=status, ptype="unavailable", last_seen=last_seen 

285 ) 

286 ) 

287 except _NoChange: 

288 pass 

289 

290 

291def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]: 

292 now = datetime.now(tz=timezone.utc) 

293 if now - last_seen < timedelta(days=7): 

294 return f"Last seen {last_seen:%A %H:%M %p GMT}", True 

295 else: 

296 return f"Last seen {last_seen:%b %-d %Y}", False