Coverage for slidge/core/mixins/presence.py: 85%

136 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import re 

2from asyncio import Task, sleep 

3from datetime import datetime, timedelta, timezone 

4from functools import partial 

5from typing import TYPE_CHECKING, Optional 

6 

7from slixmpp.types import PresenceShows, PresenceTypes 

8from sqlalchemy.exc import InvalidRequestError 

9from sqlalchemy.orm.exc import DetachedInstanceError 

10 

11from ...db.models import Contact, Participant 

12from ...util.types import CachedPresence 

13from .base import BaseSender 

14from .db import DBMixin 

15 

16if TYPE_CHECKING: 

17 from ..session import BaseSession 

18 

19 

20class _NoChange(Exception): 

21 pass 

22 

23 

24_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

25_UPDATE_LAST_SEEN_FALLBACK_TASKS = dict[int, Task]() 

26_ONE_WEEK_SECONDS = 3600 * 24 * 7 

27 

28 

29async def _update_last_seen_fallback(session: "BaseSession", contact_pk: int) -> None: 

30 await sleep(_ONE_WEEK_SECONDS) 

31 with session.xmpp.store.session() as orm: 

32 stored = orm.get(Contact, contact_pk) 

33 if stored is None: 

34 return 

35 contact = session.contacts.from_store(stored) 

36 contact.send_last_presence(force=True, no_cache_online=False) 

37 

38 

39def _clear_last_seen_task(contact_pk: int, _task) -> None: 

40 try: 

41 del _UPDATE_LAST_SEEN_FALLBACK_TASKS[contact_pk] 

42 except KeyError: 

43 pass 

44 

45 

46class PresenceMixin(BaseSender, DBMixin): 

47 _ONLY_SEND_PRESENCE_CHANGES = False 

48 

49 stored: Contact | Participant 

50 

51 def __init__(self, *a, **k) -> None: 

52 super().__init__(*a, **k) 

53 # this is only used when a presence is set during Contact.update_info(), 

54 # when the contact does not have a DB primary key yet, and is written 

55 # to DB at the end of update_info() 

56 self.cached_presence: Optional[CachedPresence] = None 

57 

58 def __stored(self) -> Contact | None: 

59 if isinstance(self.stored, Contact): 

60 return self.stored 

61 else: 

62 try: 

63 return self.stored.contact 

64 except DetachedInstanceError: 

65 self.merge() 

66 return self.stored.contact 

67 

68 @property 

69 def __contact_pk(self) -> int | None: 

70 stored = self.__stored() 

71 return None if stored is None else stored.id 

72 

73 def _get_last_presence(self) -> Optional[CachedPresence]: 

74 stored = self.__stored() 

75 if stored is None or not stored.cached_presence: 

76 return None 

77 return CachedPresence( 

78 None 

79 if stored.last_seen is None 

80 else stored.last_seen.replace(tzinfo=timezone.utc), 

81 stored.ptype, # type:ignore 

82 stored.pstatus, 

83 stored.pshow, # type:ignore 

84 ) 

85 

86 def _store_last_presence(self, new: CachedPresence) -> None: 

87 stored = self.__stored() 

88 if stored is not None: 

89 stored.cached_presence = True 

90 for k, v in new._asdict().items(): 

91 setattr(stored, k, v) 

92 try: 

93 self.commit() 

94 except InvalidRequestError: 

95 self.commit(merge=True) 

96 

97 def _make_presence( 

98 self, 

99 *, 

100 last_seen: Optional[datetime] = None, 

101 force: bool = False, 

102 bare: bool = False, 

103 ptype: Optional[PresenceTypes] = None, 

104 pstatus: Optional[str] = None, 

105 pshow: Optional[PresenceShows] = None, 

106 ): 

107 if last_seen and last_seen.tzinfo is None: 

108 last_seen = last_seen.astimezone(timezone.utc) 

109 

110 old = self._get_last_presence() 

111 

112 if ptype not in _FRIEND_REQUEST_PRESENCES: 

113 new = CachedPresence( 

114 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

115 ) 

116 if old != new: 

117 if hasattr(self, "muc") and ptype == "unavailable": 

118 stored = self.__stored() 

119 if stored is not None: 

120 stored.cached_presence = False 

121 self.commit(merge=True) 

122 else: 

123 self._store_last_presence(new) 

124 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

125 if old == new: 

126 self.session.log.debug("Presence is the same as cached") 

127 raise _NoChange 

128 self.session.log.debug( 

129 "Presence is not the same as cached: %s vs %s", old, new 

130 ) 

131 

132 p = self.xmpp.make_presence( 

133 pfrom=self.jid.bare if bare else self.jid, 

134 ptype=ptype, 

135 pshow=pshow, 

136 pstatus=pstatus, 

137 ) 

138 if last_seen: 

139 # it's ugly to check for the presence of this string, but a better fix is more work 

140 if not re.match( 

141 ".*Last seen .*", p["status"] 

142 ) and self.session.user.preferences.get("last_seen_fallback", True): 

143 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

144 if p["status"]: 

145 p["status"] = p["status"] + " -- " + last_seen_fallback 

146 else: 

147 p["status"] = last_seen_fallback 

148 pk = self.__contact_pk 

149 if recent and pk is not None: 

150 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

151 # but if lasts more than a week, this is not very informative, so 

152 # we need to force resend an updated presence status 

153 task = _UPDATE_LAST_SEEN_FALLBACK_TASKS.get(pk) 

154 if task is not None: 

155 task.cancel() 

156 task = self.session.create_task( 

157 _update_last_seen_fallback(self.session, pk) 

158 ) 

159 _UPDATE_LAST_SEEN_FALLBACK_TASKS[pk] = task 

160 task.add_done_callback(partial(_clear_last_seen_task, pk)) 

161 p["idle"]["since"] = last_seen 

162 return p 

163 

164 def send_last_presence( 

165 self, force: bool = False, no_cache_online: bool = False 

166 ) -> None: 

167 if (cache := self._get_last_presence()) is None: 

168 if force: 

169 if no_cache_online: 

170 self.online() 

171 else: 

172 self.offline() 

173 return 

174 self._send( 

175 self._make_presence( 

176 last_seen=cache.last_seen, 

177 force=True, 

178 ptype=cache.ptype, 

179 pshow=cache.pshow, 

180 pstatus=cache.pstatus, 

181 ) 

182 ) 

183 

184 def online( 

185 self, 

186 status: Optional[str] = None, 

187 last_seen: Optional[datetime] = None, 

188 ) -> None: 

189 """ 

190 Send an "online" presence from this contact to the user. 

191 

192 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

193 :param last_seen: For :xep:`0319` 

194 """ 

195 try: 

196 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

197 except _NoChange: 

198 pass 

199 

200 def away( 

201 self, 

202 status: Optional[str] = None, 

203 last_seen: Optional[datetime] = None, 

204 ) -> None: 

205 """ 

206 Send an "away" presence from this contact to the user. 

207 

208 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

209 which concerns a specific conversation, ie a specific "chat window" 

210 

211 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

212 :param last_seen: For :xep:`0319` 

213 """ 

214 try: 

215 self._send( 

216 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

217 ) 

218 except _NoChange: 

219 pass 

220 

221 def extended_away( 

222 self, 

223 status: Optional[str] = None, 

224 last_seen: Optional[datetime] = None, 

225 ) -> None: 

226 """ 

227 Send an "extended away" presence from this contact to the user. 

228 

229 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

230 which concerns a specific conversation, ie a specific "chat window" 

231 

232 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

233 :param last_seen: For :xep:`0319` 

234 """ 

235 try: 

236 self._send( 

237 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

238 ) 

239 except _NoChange: 

240 pass 

241 

242 def busy( 

243 self, 

244 status: Optional[str] = None, 

245 last_seen: Optional[datetime] = None, 

246 ) -> None: 

247 """ 

248 Send a "busy" (ie, "dnd") presence from this contact to the user, 

249 

250 :param status: eg: "Trying to make sense of XEP-0100" 

251 :param last_seen: For :xep:`0319` 

252 """ 

253 try: 

254 self._send( 

255 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

256 ) 

257 except _NoChange: 

258 pass 

259 

260 def offline( 

261 self, 

262 status: Optional[str] = None, 

263 last_seen: Optional[datetime] = None, 

264 ) -> None: 

265 """ 

266 Send an "offline" presence from this contact to the user. 

267 

268 :param status: eg: "Trying to make sense of XEP-0100" 

269 :param last_seen: For :xep:`0319` 

270 """ 

271 try: 

272 self._send( 

273 self._make_presence( 

274 pstatus=status, ptype="unavailable", last_seen=last_seen 

275 ) 

276 ) 

277 except _NoChange: 

278 pass 

279 

280 

281def get_last_seen_fallback(last_seen: datetime): 

282 now = datetime.now(tz=timezone.utc) 

283 if now - last_seen < timedelta(days=7): 

284 return f"Last seen {last_seen:%A %H:%M GMT}", True 

285 else: 

286 return f"Last seen {last_seen:%b %-d %Y}", False