Coverage for slidge / core / mixins / presence.py: 91%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1import contextlib 

2import re 

3from asyncio import Task, sleep 

4from datetime import UTC, datetime, timedelta 

5from functools import partial 

6from typing import TYPE_CHECKING 

7 

8from slixmpp import Presence 

9from slixmpp.types import PresenceShows, PresenceTypes 

10from sqlalchemy.orm.exc import DetachedInstanceError 

11 

12from ...db.models import Contact, Participant 

13from ...util.types import AnySession, CachedPresence 

14from .base import BaseSender 

15from .db import DBMixin 

16 

17if TYPE_CHECKING: 

18 pass 

19 

20 

21class _NoChange(Exception): 

22 pass 

23 

24 

25_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

26_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task[None]]() 

27_ONE_WEEK_SECONDS = 3600 * 24 * 7 

28 

29 

30async def _update_last_seen_fallback(session: AnySession, contact_pk: int) -> None: 

31 await sleep(_ONE_WEEK_SECONDS) 

32 with session.xmpp.store.session() as orm: 

33 stored = orm.get(Contact, contact_pk) 

34 if stored is None: 

35 return 

36 contact = session.contacts.from_store(stored) 

37 contact.send_last_presence(force=True, no_cache_online=False) 

38 

39 

40def _clear_last_seen_task(contact_pk: int, _task: Task[None]) -> None: 

41 with contextlib.suppress(KeyError): 

42 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk] 

43 

44 

45class PresenceMixin(BaseSender, DBMixin): 

46 _ONLY_SEND_PRESENCE_CHANGES = False 

47 

48 # this attribute actually only exists for contacts and not participants 

49 _updating_info: bool 

50 stored: Contact | Participant 

51 

52 def __init__(self, *a: object, **k: object) -> None: 

53 super().__init__(*a, **k) 

54 # this is only used when a presence is set during Contact.update_info(), 

55 # when the contact does not have a DB primary key yet, and is written 

56 # to DB at the end of update_info() 

57 self.cached_presence: CachedPresence | None = None 

58 

59 def __is_contact(self) -> bool: 

60 return isinstance(self.stored, Contact) 

61 

62 def __stored(self) -> Contact | None: 

63 if self.__is_contact(): 

64 assert isinstance(self.stored, Contact) 

65 return self.stored 

66 else: 

67 assert isinstance(self.stored, Participant) 

68 try: 

69 return self.stored.contact 

70 except DetachedInstanceError: 

71 with self.xmpp.store.session() as orm: 

72 orm.add(self.stored) 

73 if self.stored.contact is None: 

74 return None 

75 orm.refresh(self.stored.contact) 

76 orm.merge(self.stored) 

77 return self.stored.contact 

78 

79 @property 

80 def __contact_pk(self) -> int | None: 

81 stored = self.__stored() 

82 return None if stored is None else stored.id 

83 

84 def _get_last_presence(self) -> CachedPresence | None: 

85 stored = self.__stored() 

86 if stored is None or not stored.cached_presence: 

87 return None 

88 return CachedPresence( 

89 None if stored.last_seen is None else stored.last_seen.replace(tzinfo=UTC), 

90 stored.ptype, # type:ignore 

91 stored.pstatus, 

92 stored.pshow, # type:ignore 

93 ) 

94 

95 def _store_last_presence(self, new: CachedPresence) -> None: 

96 if self.__is_contact(): 

97 contact = self 

98 elif (contact := getattr(self, "contact", None)) is None: # type:ignore[assignment] 

99 return 

100 contact.update_stored_attribute( # type:ignore[attr-defined] 

101 cached_presence=True, 

102 **new._asdict(), 

103 ) 

104 

105 def _make_presence( 

106 self, 

107 *, 

108 last_seen: datetime | None = None, 

109 force: bool = False, 

110 bare: bool = False, 

111 ptype: PresenceTypes | None = None, 

112 pstatus: str | None = None, 

113 pshow: PresenceShows | None = None, 

114 ) -> Presence: 

115 if last_seen and last_seen.tzinfo is None: 

116 last_seen = last_seen.astimezone(UTC) 

117 

118 old = self._get_last_presence() 

119 

120 if ptype not in _FRIEND_REQUEST_PRESENCES: 

121 new = CachedPresence( 

122 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

123 ) 

124 if old != new: 

125 if hasattr(self, "muc") and ptype == "unavailable": 

126 stored = self.__stored() 

127 if stored is not None: 

128 stored.cached_presence = False 

129 self.commit() 

130 else: 

131 self._store_last_presence(new) 

132 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

133 if old == new: 

134 self.session.log.debug("Presence is the same as cached") 

135 raise _NoChange 

136 self.session.log.debug( 

137 "Presence is not the same as cached: %s vs %s", old, new 

138 ) 

139 

140 p = self.xmpp.make_presence( 

141 pfrom=self.jid.bare if bare else self.jid, 

142 ptype=ptype, 

143 pshow=pshow, 

144 pstatus=pstatus, 

145 ) 

146 if last_seen: 

147 # it's ugly to check for the presence of this string, but a better fix is more work 

148 if not re.match( 

149 ".*Last seen .*", p["status"] 

150 ) and self.session.user.preferences.get("last_seen_fallback", True): 

151 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

152 if p["status"]: 

153 p["status"] = p["status"] + " -- " + last_seen_fallback 

154 else: 

155 p["status"] = last_seen_fallback 

156 pk = self.__contact_pk 

157 if recent and pk is not None: 

158 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

159 # but if lasts more than a week, this is not very informative, so 

160 # we need to force resend an updated presence status 

161 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk) 

162 if task is not None: 

163 task.cancel() 

164 task = self.session.create_task( 

165 _update_last_seen_fallback(self.session, pk) 

166 ) 

167 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task 

168 task.add_done_callback(partial(_clear_last_seen_task, pk)) 

169 p["idle"]["since"] = last_seen 

170 return p 

171 

172 def send_last_presence( 

173 self, force: bool = False, no_cache_online: bool = False 

174 ) -> None: 

175 if (cache := self._get_last_presence()) is None: 

176 if force: 

177 if no_cache_online: 

178 self.online() 

179 else: 

180 self.offline() 

181 return 

182 self._send( 

183 self._make_presence( 

184 last_seen=cache.last_seen, 

185 force=True, 

186 ptype=cache.ptype, 

187 pshow=cache.pshow, 

188 pstatus=cache.pstatus, 

189 ) 

190 ) 

191 

192 def online( 

193 self, 

194 status: str | None = None, 

195 last_seen: datetime | None = None, 

196 ) -> None: 

197 """ 

198 Send an "online" presence from this contact to the user. 

199 

200 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

201 :param last_seen: For :xep:`0319` 

202 """ 

203 with contextlib.suppress(_NoChange): 

204 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

205 

206 def away( 

207 self, 

208 status: str | None = None, 

209 last_seen: datetime | None = None, 

210 ) -> None: 

211 """ 

212 Send an "away" presence from this contact to the user. 

213 

214 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

215 which concerns a specific conversation, ie a specific "chat window" 

216 

217 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

218 :param last_seen: For :xep:`0319` 

219 """ 

220 with contextlib.suppress(_NoChange): 

221 self._send( 

222 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

223 ) 

224 

225 def extended_away( 

226 self, 

227 status: str | None = None, 

228 last_seen: datetime | None = None, 

229 ) -> None: 

230 """ 

231 Send an "extended away" presence from this contact to the user. 

232 

233 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

234 which concerns a specific conversation, ie a specific "chat window" 

235 

236 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

237 :param last_seen: For :xep:`0319` 

238 """ 

239 with contextlib.suppress(_NoChange): 

240 self._send( 

241 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

242 ) 

243 

244 def busy( 

245 self, 

246 status: str | None = None, 

247 last_seen: datetime | None = None, 

248 ) -> None: 

249 """ 

250 Send a "busy" (ie, "dnd") presence from this contact to the user, 

251 

252 :param status: eg: "Trying to make sense of XEP-0100" 

253 :param last_seen: For :xep:`0319` 

254 """ 

255 with contextlib.suppress(_NoChange): 

256 self._send( 

257 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

258 ) 

259 

260 def offline( 

261 self, 

262 status: str | None = None, 

263 last_seen: datetime | None = None, 

264 ) -> None: 

265 """ 

266 Send an "offline" presence from this contact to the user. 

267 

268 :param status: eg: "Trying to make sense of XEP-0100" 

269 :param last_seen: For :xep:`0319` 

270 """ 

271 with contextlib.suppress(_NoChange): 

272 self._send( 

273 self._make_presence( 

274 pstatus=status, ptype="unavailable", last_seen=last_seen 

275 ) 

276 ) 

277 

278 

279def get_last_seen_fallback(last_seen: datetime) -> tuple[str, bool]: 

280 now = datetime.now(tz=UTC) 

281 if now - last_seen < timedelta(days=7): 

282 return f"Last seen {last_seen:%A %H:%M %p GMT}", True 

283 else: 

284 return f"Last seen {last_seen:%b %-d %Y}", False