Coverage for slidge/contact/contact.py: 85%

286 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import datetime 

2import logging 

3import warnings 

4from datetime import date 

5from typing import TYPE_CHECKING, Generic, Iterable, Iterator, Optional, Union 

6from xml.etree import ElementTree as ET 

7 

8from slixmpp import JID, Message, Presence 

9from slixmpp.exceptions import IqError, IqTimeout 

10from slixmpp.plugins.xep_0292.stanza import VCard4 

11from slixmpp.types import MessageTypes 

12from sqlalchemy.exc import IntegrityError 

13 

14from ..core.mixins import AvatarMixin, FullCarbonMixin 

15from ..core.mixins.disco import ContactAccountDiscoMixin 

16from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

17from ..db.models import Contact, ContactSent, Participant 

18from ..util import SubclassableOnce 

19from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar 

20 

21if TYPE_CHECKING: 

22 from ..core.session import BaseSession 

23 from ..group.participant import LegacyParticipant 

24 

25 

26class LegacyContact( 

27 Generic[LegacyUserIdType], 

28 AvatarMixin, 

29 ContactAccountDiscoMixin, 

30 FullCarbonMixin, 

31 ReactionRecipientMixin, 

32 ThreadRecipientMixin, 

33 metaclass=SubclassableOnce, 

34): 

35 """ 

36 This class centralizes actions in relation to a specific legacy contact. 

37 

38 You shouldn't create instances of contacts manually, but rather rely on 

39 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

40 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

41 through the :attr:`.BaseSession.contacts` attribute. 

42 

43 Typically, your plugin should have methods hook to the legacy events and 

44 call appropriate methods here to transmit the "legacy action" to the xmpp 

45 user. This should look like this: 

46 

47 .. code-block:python 

48 

49 class Session(BaseSession): 

50 ... 

51 

52 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

53 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

54 contact.send_text(legacy_msg_event.text) 

55 

56 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

57 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

58 contact.composing() 

59 ... 

60 

61 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

62 the user TO the contact, typically when the user uses an official client to 

63 do an action such as sending a message or marking as message as read. 

64 This will use :xep:`0363` to impersonate the XMPP user in order. 

65 """ 

66 

67 session: "BaseSession" 

68 

69 RESOURCE: str = "slidge" 

70 """ 

71 A full JID, including a resource part is required for chat states (and maybe other stuff) 

72 to work properly. This is the name of the resource the contacts will use. 

73 """ 

74 PROPAGATE_PRESENCE_TO_GROUPS = True 

75 

76 mtype: MessageTypes = "chat" 

77 _can_send_carbon = True 

78 is_group = False 

79 

80 _ONLY_SEND_PRESENCE_CHANGES = True 

81 

82 STRIP_SHORT_DELAY = True 

83 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"} 

84 

85 INVITATION_RECIPIENT = True 

86 

87 stored: Contact 

88 model: Contact 

89 

90 def __init__(self, session: "BaseSession", stored: Contact) -> None: 

91 self.session = session 

92 self.xmpp = session.xmpp 

93 self.stored = stored 

94 self._set_logger() 

95 super().__init__() 

96 

97 @property 

98 def jid(self): 

99 jid = JID(self.stored.jid) 

100 jid.resource = self.RESOURCE 

101 return jid 

102 

103 @property 

104 def legacy_id(self): 

105 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) 

106 

107 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

108 if fetch and not self.stored.vcard_fetched: 

109 await self.fetch_vcard() 

110 if self.stored.vcard is None: 

111 return None 

112 

113 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

114 

115 @property 

116 def is_friend(self) -> bool: 

117 return self.stored.is_friend 

118 

119 @is_friend.setter 

120 def is_friend(self, value: bool) -> None: 

121 if value == self.is_friend: 

122 return 

123 self.stored.is_friend = value 

124 self.commit() 

125 

126 @property 

127 def added_to_roster(self) -> bool: 

128 return self.stored.added_to_roster 

129 

130 @added_to_roster.setter 

131 def added_to_roster(self, value: bool) -> None: 

132 if value == self.added_to_roster: 

133 return 

134 self.stored.added_to_roster = value 

135 self.commit() 

136 

137 @property 

138 def participants(self) -> Iterator["LegacyParticipant"]: 

139 with self.xmpp.store.session() as orm: 

140 self.stored = orm.merge(self.stored) 

141 participants = self.stored.participants 

142 for p in participants: 

143 with self.xmpp.store.session() as orm: 

144 orm.add(p) 

145 muc = self.session.bookmarks.from_store(p.room) 

146 yield muc.participant_from_store(p, contact=self) 

147 

148 @property 

149 def user_jid(self): 

150 return self.session.user_jid 

151 

152 @property # type:ignore 

153 def DISCO_TYPE(self) -> ClientType: 

154 return self.client_type 

155 

156 @DISCO_TYPE.setter 

157 def DISCO_TYPE(self, value: ClientType) -> None: 

158 self.client_type = value 

159 

160 @property 

161 def client_type(self) -> ClientType: 

162 """ 

163 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

164 

165 Default is "pc". 

166 """ 

167 return self.stored.client_type 

168 

169 @client_type.setter 

170 def client_type(self, value: ClientType) -> None: 

171 if self.stored.client_type == value: 

172 return 

173 self.stored.client_type = value 

174 self.commit() 

175 

176 def _set_logger(self) -> None: 

177 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

178 

179 def __repr__(self) -> str: 

180 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

181 

182 def __get_subscription_string(self) -> str: 

183 if self.is_friend: 

184 return "both" 

185 return "none" 

186 

187 def __propagate_to_participants(self, stanza: Presence) -> None: 

188 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

189 return 

190 

191 ptype = stanza["type"] 

192 if ptype in ("available", "chat"): 

193 func_name = "online" 

194 elif ptype in ("xa", "unavailable"): 

195 # we map unavailable to extended_away, because offline is 

196 # "participant leaves the MUC" 

197 # TODO: improve this with a clear distinction between participant 

198 # and member list 

199 func_name = "extended_away" 

200 elif ptype == "busy": 

201 func_name = "busy" 

202 elif ptype == "away": 

203 func_name = "away" 

204 else: 

205 return 

206 

207 last_seen: Optional[datetime.datetime] = ( 

208 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None 

209 ) 

210 

211 kw = dict(status=stanza["status"], last_seen=last_seen) 

212 

213 for part in self.participants: 

214 func = getattr(part, func_name) 

215 func(**kw) 

216 

217 def _send( 

218 self, 

219 stanza: MessageOrPresenceTypeVar, 

220 carbon: bool = False, 

221 nick: bool = False, 

222 **send_kwargs, 

223 ) -> MessageOrPresenceTypeVar: 

224 if carbon and isinstance(stanza, Message): 

225 stanza["to"] = self.jid.bare 

226 stanza["from"] = self.user_jid 

227 self._privileged_send(stanza) 

228 return stanza # type:ignore 

229 

230 if isinstance(stanza, Presence): 

231 if not self._updating_info: 

232 self.__propagate_to_participants(stanza) 

233 if ( 

234 not self.is_friend 

235 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

236 ): 

237 return stanza # type:ignore 

238 if self.name and (nick or not self.is_friend): 

239 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

240 n["nick"] = self.name 

241 stanza.append(n) 

242 if ( 

243 not self._updating_info 

244 and self.xmpp.MARK_ALL_MESSAGES 

245 and is_markable(stanza) 

246 ): 

247 try: 

248 with self.xmpp.store.session(expire_on_commit=False) as orm: 

249 self.stored = orm.merge(self.stored) 

250 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

251 orm.add(new) 

252 self.stored.sent_order.append(new) 

253 orm.commit() 

254 except IntegrityError: 

255 self.log.warning("Contact has already sent message %s", stanza["id"]) 

256 stanza["to"] = self.user_jid 

257 stanza.send() 

258 return stanza 

259 

260 def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str) -> list[str]: 

261 """ 

262 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

263 

264 Plugins have no reason to use this, but it is used by slidge core 

265 for legacy networks that need to mark all messages as read (most XMPP 

266 clients only send a read marker for the latest message). 

267 

268 This has side effects, if the horizon XMPP id is found, messages up to 

269 this horizon are not cleared, to avoid sending the same read mark twice. 

270 

271 :param horizon_xmpp_id: The latest message 

272 :return: A list of XMPP ids or None if horizon_xmpp_id was not found 

273 """ 

274 with self.xmpp.store.session() as orm: 

275 assert self.stored.id is not None 

276 ids = self.xmpp.store.contacts.pop_sent_up_to( 

277 orm, self.stored.id, horizon_xmpp_id 

278 ) 

279 orm.commit() 

280 return ids 

281 

282 @property 

283 def name(self) -> str: 

284 """ 

285 Friendly name of the contact, as it should appear in the user's roster 

286 """ 

287 return self.stored.nick or "" 

288 

289 @name.setter 

290 def name(self, n: Optional[str]) -> None: 

291 if self.stored.nick == n: 

292 return 

293 self.stored.nick = n 

294 self._set_logger() 

295 if self.is_friend and self.added_to_roster: 

296 self.xmpp.pubsub.broadcast_nick( 

297 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

298 ) 

299 self.commit() 

300 for p in self.participants: 

301 p.nickname = n 

302 

303 def _post_avatar_update(self, cached_avatar) -> None: 

304 if self.is_friend and self.added_to_roster: 

305 self.session.create_task( 

306 self.session.xmpp.pubsub.broadcast_avatar( 

307 self.jid.bare, self.session.user_jid, cached_avatar 

308 ) 

309 ) 

310 for p in self.participants: 

311 self.log.debug("Propagating new avatar to %s", p.muc) 

312 p.send_last_presence(force=True, no_cache_online=True) 

313 

314 def set_vcard( 

315 self, 

316 /, 

317 full_name: Optional[str] = None, 

318 given: Optional[str] = None, 

319 surname: Optional[str] = None, 

320 birthday: Optional[date] = None, 

321 phone: Optional[str] = None, 

322 phones: Iterable[str] = (), 

323 note: Optional[str] = None, 

324 url: Optional[str] = None, 

325 email: Optional[str] = None, 

326 country: Optional[str] = None, 

327 locality: Optional[str] = None, 

328 ) -> None: 

329 vcard = VCard4() 

330 vcard.add_impp(f"xmpp:{self.jid.bare}") 

331 

332 if n := self.name: 

333 vcard.add_nickname(n) 

334 if full_name: 

335 vcard["full_name"] = full_name 

336 elif n: 

337 vcard["full_name"] = n 

338 

339 if given: 

340 vcard["given"] = given 

341 if surname: 

342 vcard["surname"] = surname 

343 if birthday: 

344 vcard["birthday"] = birthday 

345 

346 if note: 

347 vcard.add_note(note) 

348 if url: 

349 vcard.add_url(url) 

350 if email: 

351 vcard.add_email(email) 

352 if phone: 

353 vcard.add_tel(phone) 

354 for p in phones: 

355 vcard.add_tel(p) 

356 if country and locality: 

357 vcard.add_address(country, locality) 

358 elif country: 

359 vcard.add_address(country, locality) 

360 

361 self.stored.vcard = str(vcard) 

362 self.stored.vcard_fetched = True 

363 self.session.create_task( 

364 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

365 ) 

366 

367 self.commit() 

368 

369 def get_roster_item(self): 

370 item = { 

371 "subscription": self.__get_subscription_string(), 

372 "groups": [self.xmpp.ROSTER_GROUP], 

373 } 

374 if (n := self.name) is not None: 

375 item["name"] = n 

376 return {self.jid.bare: item} 

377 

378 async def add_to_roster(self, force: bool = False) -> None: 

379 """ 

380 Add this contact to the user roster using :xep:`0356` 

381 

382 :param force: add even if the contact was already added successfully 

383 """ 

384 if self.added_to_roster and not force: 

385 return 

386 if not self.session.user.preferences.get("roster_push", True): 

387 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

388 return 

389 try: 

390 await self.xmpp["xep_0356"].set_roster( 

391 jid=self.user_jid, roster_items=self.get_roster_item() 

392 ) 

393 except PermissionError: 

394 from slidge import __version__ 

395 

396 warnings.warn( 

397 "Slidge does not have the privilege to manage rosters. See " 

398 f"https://slidge.im/docs/slidge/{__version__}/admin/privilege.html" 

399 ) 

400 self.send_friend_request( 

401 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

402 "slidge is not allowed to manage your roster." 

403 ) 

404 return 

405 except (IqError, IqTimeout) as e: 

406 self.log.warning("Could not add to roster", exc_info=e) 

407 else: 

408 # we only broadcast pubsub events for contacts added to the roster 

409 # so if something was set before, we need to push it now 

410 self.added_to_roster = True 

411 self.send_last_presence() 

412 

413 async def __broadcast_pubsub_items(self) -> None: 

414 if not self.is_friend: 

415 return 

416 if not self.added_to_roster: 

417 return 

418 cached_avatar = self.get_cached_avatar() 

419 if cached_avatar is not None: 

420 await self.xmpp.pubsub.broadcast_avatar( 

421 self.jid.bare, self.session.user_jid, cached_avatar 

422 ) 

423 nick = self.name 

424 

425 if nick is not None: 

426 self.xmpp.pubsub.broadcast_nick( 

427 self.session.user_jid, 

428 self.jid.bare, 

429 nick, 

430 ) 

431 

432 def send_friend_request(self, text: Optional[str] = None) -> None: 

433 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

434 self._send(presence, nick=True) 

435 

436 async def accept_friend_request(self, text: Optional[str] = None) -> None: 

437 """ 

438 Call this to signify that this Contact has accepted to be a friend 

439 of the user. 

440 

441 :param text: Optional message from the friend to the user 

442 """ 

443 self.is_friend = True 

444 self.added_to_roster = True 

445 self.log.debug("Accepting friend request") 

446 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

447 self._send(presence, nick=True) 

448 self.send_last_presence() 

449 await self.__broadcast_pubsub_items() 

450 self.log.debug("Accepted friend request") 

451 

452 def reject_friend_request(self, text: Optional[str] = None) -> None: 

453 """ 

454 Call this to signify that this Contact has refused to be a contact 

455 of the user (or that they don't want to be friends anymore) 

456 

457 :param text: Optional message from the non-friend to the user 

458 """ 

459 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

460 self.offline() 

461 self._send(presence, nick=True) 

462 self.is_friend = False 

463 

464 async def on_friend_request(self, text: str = "") -> None: 

465 """ 

466 Called when receiving a "subscribe" presence, ie, "I would like to add 

467 you to my contacts/friends", from the user to this contact. 

468 

469 In XMPP terms: "I would like to receive your presence updates" 

470 

471 This is only called if self.is_friend = False. If self.is_friend = True, 

472 slidge will automatically "accept the friend request", ie, reply with 

473 a "subscribed" presence. 

474 

475 When called, a 'friend request event' should be sent to the legacy 

476 service, and when the contact responds, you should either call 

477 self.accept_subscription() or self.reject_subscription() 

478 """ 

479 pass 

480 

481 async def on_friend_delete(self, text: str = "") -> None: 

482 """ 

483 Called when receiving an "unsubscribed" presence, ie, "I would like to 

484 remove you to my contacts/friends" or "I refuse your friend request" 

485 from the user to this contact. 

486 

487 In XMPP terms: "You won't receive my presence updates anymore (or you 

488 never have)". 

489 """ 

490 pass 

491 

492 async def on_friend_accept(self) -> None: 

493 """ 

494 Called when receiving a "subscribed" presence, ie, "I accept to be 

495 your/confirm that you are my friend" from the user to this contact. 

496 

497 In XMPP terms: "You will receive my presence updates". 

498 """ 

499 pass 

500 

501 def unsubscribe(self) -> None: 

502 """ 

503 (internal use by slidge) 

504 

505 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

506 from this contact to the user, ie, "this contact has removed you from 

507 their 'friends'". 

508 """ 

509 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

510 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

511 

512 async def update_info(self) -> None: 

513 """ 

514 Fetch information about this contact from the legacy network 

515 

516 This is awaited on Contact instantiation, and should be overridden to 

517 update the nickname, avatar, vcard [...] of this contact, by making 

518 "legacy API calls". 

519 

520 To take advantage of the slidge avatar cache, you can check the .avatar 

521 property to retrieve the "legacy file ID" of the cached avatar. If there 

522 is no change, you should not call 

523 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

524 to modify the ``.avatar`` property. 

525 """ 

526 pass 

527 

528 async def fetch_vcard(self) -> None: 

529 """ 

530 It the legacy network doesn't like that you fetch too many profiles on startup, 

531 it's also possible to fetch it here, which will be called when XMPP clients 

532 of the user request the vcard, if it hasn't been fetched before 

533 :return: 

534 """ 

535 pass 

536 

537 def _make_presence( 

538 self, 

539 *, 

540 last_seen: Optional[datetime.datetime] = None, 

541 status_codes: Optional[set[int]] = None, 

542 user_full_jid: Optional[JID] = None, 

543 **presence_kwargs, 

544 ): 

545 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

546 caps = self.xmpp.plugin["xep_0115"] 

547 if p.get_from().resource and self.stored.caps_ver: 

548 p["caps"]["node"] = caps.caps_node 

549 p["caps"]["hash"] = caps.hash 

550 p["caps"]["ver"] = self.stored.caps_ver 

551 return p 

552 

553 

554def is_markable(stanza: Union[Message, Presence]): 

555 if isinstance(stanza, Presence): 

556 return False 

557 return bool(stanza["body"]) 

558 

559 

560log = logging.getLogger(__name__)