Coverage for slidge/core/mixins/presence.py: 86%

97 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import re 

2from asyncio import Task, sleep 

3from datetime import datetime, timedelta, timezone 

4from typing import Optional 

5 

6from slixmpp.types import PresenceShows, PresenceTypes 

7 

8from ...util.types import CachedPresence 

9from .. import config 

10from .base import BaseSender 

11 

12 

13class _NoChange(Exception): 

14 pass 

15 

16 

17_FRIEND_REQUEST_PRESENCES = {"subscribe", "unsubscribe", "subscribed", "unsubscribed"} 

18 

19 

20class PresenceMixin(BaseSender): 

21 _ONLY_SEND_PRESENCE_CHANGES = False 

22 contact_pk: Optional[int] = None 

23 

24 def __init__(self, *a, **k): 

25 super().__init__(*a, **k) 

26 # FIXME: this should not be an attribute of this mixin to allow garbage 

27 # collection of instances 

28 self.__update_last_seen_fallback_task: Optional[Task] = None 

29 # this is only used when a presence is set during Contact.update_info(), 

30 # when the contact does not have a DB primary key yet, and is written 

31 # to DB at the end of update_info() 

32 self.cached_presence: Optional[CachedPresence] = None 

33 

34 async def __update_last_seen_fallback(self): 

35 await sleep(3600 * 7) 

36 self.send_last_presence(force=True, no_cache_online=False) 

37 

38 def _get_last_presence(self) -> Optional[CachedPresence]: 

39 if self.contact_pk is None: 

40 return None 

41 return self.xmpp.store.contacts.get_presence(self.contact_pk) 

42 

43 def _store_last_presence(self, new: CachedPresence): 

44 if self.contact_pk is None: 

45 self.cached_presence = new 

46 return 

47 self.xmpp.store.contacts.set_presence(self.contact_pk, new) 

48 

49 def _make_presence( 

50 self, 

51 *, 

52 last_seen: Optional[datetime] = None, 

53 force=False, 

54 bare=False, 

55 ptype: Optional[PresenceTypes] = None, 

56 pstatus: Optional[str] = None, 

57 pshow: Optional[PresenceShows] = None, 

58 ): 

59 if last_seen and last_seen.tzinfo is None: 

60 last_seen = last_seen.astimezone(timezone.utc) 

61 

62 old = self._get_last_presence() 

63 

64 if ptype not in _FRIEND_REQUEST_PRESENCES: 

65 new = CachedPresence( 

66 last_seen=last_seen, ptype=ptype, pstatus=pstatus, pshow=pshow 

67 ) 

68 if old != new: 

69 if hasattr(self, "muc") and ptype == "unavailable": 

70 if self.contact_pk is not None: 

71 self.xmpp.store.contacts.reset_presence(self.contact_pk) 

72 else: 

73 self._store_last_presence(new) 

74 if old and not force and self._ONLY_SEND_PRESENCE_CHANGES: 

75 if old == new: 

76 self.session.log.debug("Presence is the same as cached") 

77 raise _NoChange 

78 self.session.log.debug( 

79 "Presence is not the same as cached: %s vs %s", old, new 

80 ) 

81 

82 p = self.xmpp.make_presence( 

83 pfrom=self.jid.bare if bare else self.jid, 

84 ptype=ptype, 

85 pshow=pshow, 

86 pstatus=pstatus, 

87 ) 

88 if last_seen: 

89 # it's ugly to check for the presence of this string, but a better fix is more work 

90 if config.LAST_SEEN_FALLBACK and not re.match( 

91 ".*Last seen .*", p["status"] 

92 ): 

93 last_seen_fallback, recent = get_last_seen_fallback(last_seen) 

94 if p["status"]: 

95 p["status"] = p["status"] + " -- " + last_seen_fallback 

96 else: 

97 p["status"] = last_seen_fallback 

98 if recent: 

99 # if less than a week, we use sth like 'Last seen: Monday, 8:05", 

100 # but if lasts more than a week, this is not very informative, so 

101 # we need to force resend an updated presence status 

102 if self.__update_last_seen_fallback_task: 

103 self.__update_last_seen_fallback_task.cancel() 

104 self.__update_last_seen_fallback_task = self.xmpp.loop.create_task( 

105 self.__update_last_seen_fallback() 

106 ) 

107 p["idle"]["since"] = last_seen 

108 return p 

109 

110 def send_last_presence(self, force=False, no_cache_online=False): 

111 if (cache := self._get_last_presence()) is None: 

112 if force: 

113 if no_cache_online: 

114 self.online() 

115 else: 

116 self.offline() 

117 return 

118 self._send( 

119 self._make_presence( 

120 last_seen=cache.last_seen, 

121 force=True, 

122 ptype=cache.ptype, 

123 pshow=cache.pshow, 

124 pstatus=cache.pstatus, 

125 ) 

126 ) 

127 

128 def online( 

129 self, 

130 status: Optional[str] = None, 

131 last_seen: Optional[datetime] = None, 

132 ): 

133 """ 

134 Send an "online" presence from this contact to the user. 

135 

136 :param status: Arbitrary text, details of the status, eg: "Listening to Britney Spears" 

137 :param last_seen: For :xep:`0319` 

138 """ 

139 try: 

140 self._send(self._make_presence(pstatus=status, last_seen=last_seen)) 

141 except _NoChange: 

142 pass 

143 

144 def away( 

145 self, 

146 status: Optional[str] = None, 

147 last_seen: Optional[datetime] = None, 

148 ): 

149 """ 

150 Send an "away" presence from this contact to the user. 

151 

152 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

153 which concerns a specific conversation, ie a specific "chat window" 

154 

155 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

156 :param last_seen: For :xep:`0319` 

157 """ 

158 try: 

159 self._send( 

160 self._make_presence(pstatus=status, pshow="away", last_seen=last_seen) 

161 ) 

162 except _NoChange: 

163 pass 

164 

165 def extended_away( 

166 self, 

167 status: Optional[str] = None, 

168 last_seen: Optional[datetime] = None, 

169 ): 

170 """ 

171 Send an "extended away" presence from this contact to the user. 

172 

173 This is a global status, as opposed to :meth:`.LegacyContact.inactive` 

174 which concerns a specific conversation, ie a specific "chat window" 

175 

176 :param status: Arbitrary text, details of the status, eg: "Gone to fight capitalism" 

177 :param last_seen: For :xep:`0319` 

178 """ 

179 try: 

180 self._send( 

181 self._make_presence(pstatus=status, pshow="xa", last_seen=last_seen) 

182 ) 

183 except _NoChange: 

184 pass 

185 

186 def busy( 

187 self, 

188 status: Optional[str] = None, 

189 last_seen: Optional[datetime] = None, 

190 ): 

191 """ 

192 Send a "busy" (ie, "dnd") presence from this contact to the user, 

193 

194 :param status: eg: "Trying to make sense of XEP-0100" 

195 :param last_seen: For :xep:`0319` 

196 """ 

197 try: 

198 self._send( 

199 self._make_presence(pstatus=status, pshow="dnd", last_seen=last_seen) 

200 ) 

201 except _NoChange: 

202 pass 

203 

204 def offline( 

205 self, 

206 status: Optional[str] = None, 

207 last_seen: Optional[datetime] = None, 

208 ): 

209 """ 

210 Send an "offline" presence from this contact to the user. 

211 

212 :param status: eg: "Trying to make sense of XEP-0100" 

213 :param last_seen: For :xep:`0319` 

214 """ 

215 try: 

216 self._send( 

217 self._make_presence( 

218 pstatus=status, ptype="unavailable", last_seen=last_seen 

219 ) 

220 ) 

221 except _NoChange: 

222 pass 

223 

224 

225def get_last_seen_fallback(last_seen: datetime): 

226 now = datetime.now(tz=timezone.utc) 

227 if now - last_seen < timedelta(days=7): 

228 return f"Last seen {last_seen:%A %H:%M GMT}", True 

229 else: 

230 return f"Last seen {last_seen:%b %-d %Y}", False