Coverage for slidge/contact/contact.py: 86%

334 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import datetime 

2import logging 

3import warnings 

4from datetime import date 

5from typing import TYPE_CHECKING, Generic, Iterable, Optional, Self, Union 

6from xml.etree import ElementTree as ET 

7 

8from slixmpp import JID, Message, Presence 

9from slixmpp.exceptions import IqError, IqTimeout 

10from slixmpp.plugins.xep_0292.stanza import VCard4 

11from slixmpp.types import MessageTypes 

12 

13from ..core import config 

14from ..core.mixins import AvatarMixin, FullCarbonMixin, StoredAttributeMixin 

15from ..core.mixins.db import UpdateInfoMixin 

16from ..core.mixins.disco import ContactAccountDiscoMixin 

17from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

18from ..db.models import Contact 

19from ..util import SubclassableOnce 

20from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar 

21 

22if TYPE_CHECKING: 

23 from ..core.session import BaseSession 

24 from ..group.participant import LegacyParticipant 

25 

26 

27class LegacyContact( 

28 Generic[LegacyUserIdType], 

29 StoredAttributeMixin, 

30 AvatarMixin, 

31 ContactAccountDiscoMixin, 

32 FullCarbonMixin, 

33 ReactionRecipientMixin, 

34 ThreadRecipientMixin, 

35 UpdateInfoMixin, 

36 metaclass=SubclassableOnce, 

37): 

38 """ 

39 This class centralizes actions in relation to a specific legacy contact. 

40 

41 You shouldn't create instances of contacts manually, but rather rely on 

42 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

43 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

44 through the :attr:`.BaseSession.contacts` attribute. 

45 

46 Typically, your plugin should have methods hook to the legacy events and 

47 call appropriate methods here to transmit the "legacy action" to the xmpp 

48 user. This should look like this: 

49 

50 .. code-block:python 

51 

52 class Session(BaseSession): 

53 ... 

54 

55 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

56 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

57 contact.send_text(legacy_msg_event.text) 

58 

59 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

60 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

61 contact.composing() 

62 ... 

63 

64 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

65 the user TO the contact, typically when the user uses an official client to 

66 do an action such as sending a message or marking as message as read. 

67 This will use :xep:`0363` to impersonate the XMPP user in order. 

68 """ 

69 

70 session: "BaseSession" 

71 

72 RESOURCE: str = "slidge" 

73 """ 

74 A full JID, including a resource part is required for chat states (and maybe other stuff) 

75 to work properly. This is the name of the resource the contacts will use. 

76 """ 

77 PROPAGATE_PRESENCE_TO_GROUPS = True 

78 

79 mtype: MessageTypes = "chat" 

80 _can_send_carbon = True 

81 is_group = False 

82 

83 _ONLY_SEND_PRESENCE_CHANGES = True 

84 

85 STRIP_SHORT_DELAY = True 

86 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"} 

87 

88 _avatar_bare_jid = True 

89 

90 INVITATION_RECIPIENT = True 

91 

92 def __init__( 

93 self, 

94 session: "BaseSession", 

95 legacy_id: LegacyUserIdType, 

96 jid_username: str, 

97 ): 

98 """ 

99 :param session: The session this contact is part of 

100 :param legacy_id: The contact's legacy ID 

101 :param jid_username: User part of this contact's 'puppet' JID. 

102 NB: case-insensitive, and some special characters are not allowed 

103 """ 

104 super().__init__() 

105 self.session = session 

106 self.legacy_id: LegacyUserIdType = legacy_id 

107 """ 

108 The legacy identifier of the :term:`Legacy Contact`. 

109 By default, this is the :term:`JID Local Part` of this 

110 :term:`XMPP Entity`. 

111 

112 Controlling what values are valid and how they are translated from a 

113 :term:`JID Local Part` is done in :meth:`.jid_username_to_legacy_id`. 

114 Reciprocally, in :meth:`legacy_id_to_jid_username` the inverse 

115 transformation is defined. 

116 """ 

117 self.jid_username = jid_username 

118 

119 self._name: Optional[str] = None 

120 

121 self.xmpp = session.xmpp 

122 self.jid = JID(self.jid_username + "@" + self.xmpp.boundjid.bare) 

123 self.jid.resource = self.RESOURCE 

124 self.log = logging.getLogger(self.jid.bare) 

125 self._set_logger_name() 

126 self._is_friend: bool = False 

127 self._added_to_roster = False 

128 self._caps_ver: str | None = None 

129 self._vcard_fetched = False 

130 self._vcard: str | None = None 

131 self._client_type: ClientType = "pc" 

132 

133 async def get_vcard(self, fetch=True) -> VCard4 | None: 

134 if fetch and not self._vcard_fetched: 

135 await self.fetch_vcard() 

136 if self._vcard is None: 

137 return None 

138 

139 return VCard4(xml=ET.fromstring(self._vcard)) 

140 

141 @property 

142 def is_friend(self): 

143 return self._is_friend 

144 

145 @is_friend.setter 

146 def is_friend(self, value: bool): 

147 if value == self._is_friend: 

148 return 

149 self._is_friend = value 

150 if self._updating_info: 

151 return 

152 self.__ensure_pk() 

153 assert self.contact_pk is not None 

154 self.xmpp.store.contacts.set_friend(self.contact_pk, value) 

155 

156 @property 

157 def added_to_roster(self): 

158 return self._added_to_roster 

159 

160 @added_to_roster.setter 

161 def added_to_roster(self, value: bool): 

162 if value == self._added_to_roster: 

163 return 

164 self._added_to_roster = value 

165 if self._updating_info: 

166 return 

167 if self.contact_pk is None: 

168 # during LegacyRoster.fill() 

169 return 

170 self.xmpp.store.contacts.set_added_to_roster(self.contact_pk, value) 

171 

172 @property 

173 def participants(self) -> list["LegacyParticipant"]: 

174 if self.contact_pk is None: 

175 return [] 

176 

177 self.__ensure_pk() 

178 from ..group.participant import LegacyParticipant 

179 

180 return [ 

181 LegacyParticipant.get_self_or_unique_subclass().from_store( 

182 self.session, stored, contact=self 

183 ) 

184 for stored in self.xmpp.store.participants.get_for_contact(self.contact_pk) 

185 ] 

186 

187 @property 

188 def user_jid(self): 

189 return self.session.user_jid 

190 

191 @property # type:ignore 

192 def DISCO_TYPE(self) -> ClientType: 

193 return self._client_type 

194 

195 @DISCO_TYPE.setter 

196 def DISCO_TYPE(self, value: ClientType) -> None: 

197 self.client_type = value 

198 

199 @property 

200 def client_type(self) -> ClientType: 

201 """ 

202 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

203 

204 Default is "pc". 

205 """ 

206 return self._client_type 

207 

208 @client_type.setter 

209 def client_type(self, value: ClientType) -> None: 

210 self._client_type = value 

211 if self._updating_info: 

212 return 

213 self.__ensure_pk() 

214 assert self.contact_pk is not None 

215 self.xmpp.store.contacts.set_client_type(self.contact_pk, value) 

216 

217 def _set_logger_name(self): 

218 self.log.name = f"{self.user_jid.bare}:contact:{self}" 

219 

220 def __repr__(self): 

221 return f"<Contact #{self.contact_pk} '{self.name}' ({self.legacy_id} - {self.jid.local})'>" 

222 

223 def __ensure_pk(self): 

224 if self.contact_pk is not None: 

225 return 

226 # This happens for legacy modules that don't follow the Roster.fill / 

227 # populate contact attributes in Contact.update_info() method. 

228 # This results in (even) less optimised SQL writes and read, but 

229 # we allow it because it fits some legacy network libs better. 

230 with self.xmpp.store.session() as orm: 

231 orm.commit() 

232 stored = self.xmpp.store.contacts.get_by_legacy_id( 

233 self.user_pk, str(self.legacy_id) 

234 ) 

235 if stored is None: 

236 self.contact_pk = self.xmpp.store.contacts.update(self, commit=True) 

237 else: 

238 self.contact_pk = stored.id 

239 assert self.contact_pk is not None 

240 

241 def __get_subscription_string(self): 

242 if self.is_friend: 

243 return "both" 

244 return "none" 

245 

246 def __propagate_to_participants(self, stanza: Presence): 

247 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

248 return 

249 

250 ptype = stanza["type"] 

251 if ptype in ("available", "chat"): 

252 func_name = "online" 

253 elif ptype in ("xa", "unavailable"): 

254 # we map unavailable to extended_away, because offline is 

255 # "participant leaves the MUC" 

256 # TODO: improve this with a clear distinction between participant 

257 # and member list 

258 func_name = "extended_away" 

259 elif ptype == "busy": 

260 func_name = "busy" 

261 elif ptype == "away": 

262 func_name = "away" 

263 else: 

264 return 

265 

266 last_seen: Optional[datetime.datetime] = ( 

267 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None 

268 ) 

269 

270 kw = dict(status=stanza["status"], last_seen=last_seen) 

271 

272 for part in self.participants: 

273 func = getattr(part, func_name) 

274 func(**kw) 

275 

276 def _send( 

277 self, stanza: MessageOrPresenceTypeVar, carbon=False, nick=False, **send_kwargs 

278 ) -> MessageOrPresenceTypeVar: 

279 if carbon and isinstance(stanza, Message): 

280 stanza["to"] = self.jid.bare 

281 stanza["from"] = self.user_jid 

282 self._privileged_send(stanza) 

283 return stanza # type:ignore 

284 

285 if isinstance(stanza, Presence): 

286 if not self._updating_info: 

287 self.__propagate_to_participants(stanza) 

288 if ( 

289 not self.is_friend 

290 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

291 ): 

292 return stanza # type:ignore 

293 if self.name and (nick or not self.is_friend): 

294 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

295 n["nick"] = self.name 

296 stanza.append(n) 

297 if ( 

298 not self._updating_info 

299 and self.xmpp.MARK_ALL_MESSAGES 

300 and is_markable(stanza) 

301 ): 

302 self.__ensure_pk() 

303 assert self.contact_pk is not None 

304 self.xmpp.store.contacts.add_to_sent(self.contact_pk, stanza["id"]) 

305 stanza["to"] = self.user_jid 

306 stanza.send() 

307 return stanza 

308 

309 def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str) -> list[str]: 

310 """ 

311 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

312 

313 Plugins have no reason to use this, but it is used by slidge core 

314 for legacy networks that need to mark all messages as read (most XMPP 

315 clients only send a read marker for the latest message). 

316 

317 This has side effects, if the horizon XMPP id is found, messages up to 

318 this horizon are not cleared, to avoid sending the same read mark twice. 

319 

320 :param horizon_xmpp_id: The latest message 

321 :return: A list of XMPP ids or None if horizon_xmpp_id was not found 

322 """ 

323 self.__ensure_pk() 

324 assert self.contact_pk is not None 

325 return self.xmpp.store.contacts.pop_sent_up_to(self.contact_pk, horizon_xmpp_id) 

326 

327 @property 

328 def name(self): 

329 """ 

330 Friendly name of the contact, as it should appear in the user's roster 

331 """ 

332 return self._name 

333 

334 @name.setter 

335 def name(self, n: Optional[str]): 

336 if self._name == n: 

337 return 

338 self._name = n 

339 self._set_logger_name() 

340 if self.is_friend and self.added_to_roster: 

341 self.xmpp.pubsub.broadcast_nick( 

342 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

343 ) 

344 if self._updating_info: 

345 # means we're in update_info(), so no participants, and no need 

346 # to write to DB now, it will be called in Roster.__finish_init_contact 

347 return 

348 for p in self.participants: 

349 p.nickname = n 

350 self.__ensure_pk() 

351 assert self.contact_pk is not None 

352 self.xmpp.store.contacts.update_nick(self.contact_pk, n) 

353 

354 def _get_cached_avatar_id(self) -> Optional[str]: 

355 if self.contact_pk is None: 

356 return None 

357 return self.xmpp.store.contacts.get_avatar_legacy_id(self.contact_pk) 

358 

359 def _post_avatar_update(self): 

360 self.__ensure_pk() 

361 assert self.contact_pk is not None 

362 self.xmpp.store.contacts.set_avatar( 

363 self.contact_pk, 

364 self._avatar_pk, 

365 None if self.avatar_id is None else str(self.avatar_id), 

366 ) 

367 for p in self.participants: 

368 self.log.debug("Propagating new avatar to %s", p.muc) 

369 p.send_last_presence(force=True, no_cache_online=True) 

370 

371 def set_vcard( 

372 self, 

373 /, 

374 full_name: Optional[str] = None, 

375 given: Optional[str] = None, 

376 surname: Optional[str] = None, 

377 birthday: Optional[date] = None, 

378 phone: Optional[str] = None, 

379 phones: Iterable[str] = (), 

380 note: Optional[str] = None, 

381 url: Optional[str] = None, 

382 email: Optional[str] = None, 

383 country: Optional[str] = None, 

384 locality: Optional[str] = None, 

385 ): 

386 vcard = VCard4() 

387 vcard.add_impp(f"xmpp:{self.jid.bare}") 

388 

389 if n := self.name: 

390 vcard.add_nickname(n) 

391 if full_name: 

392 vcard["full_name"] = full_name 

393 elif n: 

394 vcard["full_name"] = n 

395 

396 if given: 

397 vcard["given"] = given 

398 if surname: 

399 vcard["surname"] = surname 

400 if birthday: 

401 vcard["birthday"] = birthday 

402 

403 if note: 

404 vcard.add_note(note) 

405 if url: 

406 vcard.add_url(url) 

407 if email: 

408 vcard.add_email(email) 

409 if phone: 

410 vcard.add_tel(phone) 

411 for p in phones: 

412 vcard.add_tel(p) 

413 if country and locality: 

414 vcard.add_address(country, locality) 

415 elif country: 

416 vcard.add_address(country, locality) 

417 

418 self._vcard = str(vcard) 

419 self._vcard_fetched = True 

420 self.session.create_task( 

421 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

422 ) 

423 

424 if self._updating_info: 

425 return 

426 

427 assert self.contact_pk is not None 

428 self.xmpp.store.contacts.set_vcard(self.contact_pk, self._vcard) 

429 

430 def get_roster_item(self): 

431 item = { 

432 "subscription": self.__get_subscription_string(), 

433 "groups": [self.xmpp.ROSTER_GROUP], 

434 } 

435 if (n := self.name) is not None: 

436 item["name"] = n 

437 return {self.jid.bare: item} 

438 

439 async def add_to_roster(self, force=False): 

440 """ 

441 Add this contact to the user roster using :xep:`0356` 

442 

443 :param force: add even if the contact was already added successfully 

444 """ 

445 if self.added_to_roster and not force: 

446 return 

447 if config.NO_ROSTER_PUSH: 

448 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

449 return 

450 try: 

451 await self._set_roster( 

452 jid=self.user_jid, roster_items=self.get_roster_item() 

453 ) 

454 except PermissionError: 

455 warnings.warn( 

456 "Slidge does not have privileges to add contacts to the roster. Refer" 

457 " to https://slidge.im/core/admin/privilege.html for" 

458 " more info." 

459 ) 

460 if config.ROSTER_PUSH_PRESENCE_SUBSCRIPTION_REQUEST_FALLBACK: 

461 self.send_friend_request( 

462 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

463 "slidge is not allowed to manage your roster." 

464 ) 

465 return 

466 except (IqError, IqTimeout) as e: 

467 self.log.warning("Could not add to roster", exc_info=e) 

468 else: 

469 # we only broadcast pubsub events for contacts added to the roster 

470 # so if something was set before, we need to push it now 

471 self.added_to_roster = True 

472 self.send_last_presence() 

473 

474 async def __broadcast_pubsub_items(self): 

475 if not self.is_friend: 

476 return 

477 if not self.added_to_roster: 

478 return 

479 cached_avatar = self.get_cached_avatar() 

480 if cached_avatar is not None: 

481 await self.xmpp.pubsub.broadcast_avatar( 

482 self.jid.bare, self.session.user_jid, cached_avatar 

483 ) 

484 nick = self.name 

485 

486 if nick is not None: 

487 self.xmpp.pubsub.broadcast_nick( 

488 self.session.user_jid, 

489 self.jid.bare, 

490 nick, 

491 ) 

492 

493 async def _set_roster(self, **kw): 

494 try: 

495 await self.xmpp["xep_0356"].set_roster(**kw) 

496 except PermissionError: 

497 await self.xmpp["xep_0356_old"].set_roster(**kw) 

498 

499 def send_friend_request(self, text: Optional[str] = None): 

500 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

501 self._send(presence, nick=True) 

502 

503 async def accept_friend_request(self, text: Optional[str] = None): 

504 """ 

505 Call this to signify that this Contact has accepted to be a friend 

506 of the user. 

507 

508 :param text: Optional message from the friend to the user 

509 """ 

510 self.is_friend = True 

511 self.added_to_roster = True 

512 self.__ensure_pk() 

513 self.log.debug("Accepting friend request") 

514 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

515 self._send(presence, nick=True) 

516 self.send_last_presence() 

517 await self.__broadcast_pubsub_items() 

518 self.log.debug("Accepted friend request") 

519 

520 def reject_friend_request(self, text: Optional[str] = None): 

521 """ 

522 Call this to signify that this Contact has refused to be a contact 

523 of the user (or that they don't want to be friends anymore) 

524 

525 :param text: Optional message from the non-friend to the user 

526 """ 

527 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

528 self.offline() 

529 self._send(presence, nick=True) 

530 self.is_friend = False 

531 

532 async def on_friend_request(self, text=""): 

533 """ 

534 Called when receiving a "subscribe" presence, ie, "I would like to add 

535 you to my contacts/friends", from the user to this contact. 

536 

537 In XMPP terms: "I would like to receive your presence updates" 

538 

539 This is only called if self.is_friend = False. If self.is_friend = True, 

540 slidge will automatically "accept the friend request", ie, reply with 

541 a "subscribed" presence. 

542 

543 When called, a 'friend request event' should be sent to the legacy 

544 service, and when the contact responds, you should either call 

545 self.accept_subscription() or self.reject_subscription() 

546 """ 

547 pass 

548 

549 async def on_friend_delete(self, text=""): 

550 """ 

551 Called when receiving an "unsubscribed" presence, ie, "I would like to 

552 remove you to my contacts/friends" or "I refuse your friend request" 

553 from the user to this contact. 

554 

555 In XMPP terms: "You won't receive my presence updates anymore (or you 

556 never have)". 

557 """ 

558 pass 

559 

560 async def on_friend_accept(self): 

561 """ 

562 Called when receiving a "subscribed" presence, ie, "I accept to be 

563 your/confirm that you are my friend" from the user to this contact. 

564 

565 In XMPP terms: "You will receive my presence updates". 

566 """ 

567 pass 

568 

569 def unsubscribe(self): 

570 """ 

571 (internal use by slidge) 

572 

573 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

574 from this contact to the user, ie, "this contact has removed you from 

575 their 'friends'". 

576 """ 

577 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

578 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) # type: ignore 

579 

580 async def update_info(self): 

581 """ 

582 Fetch information about this contact from the legacy network 

583 

584 This is awaited on Contact instantiation, and should be overridden to 

585 update the nickname, avatar, vcard [...] of this contact, by making 

586 "legacy API calls". 

587 

588 To take advantage of the slidge avatar cache, you can check the .avatar 

589 property to retrieve the "legacy file ID" of the cached avatar. If there 

590 is no change, you should not call 

591 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

592 to modify the ``.avatar`` property. 

593 """ 

594 pass 

595 

596 async def fetch_vcard(self): 

597 """ 

598 It the legacy network doesn't like that you fetch too many profiles on startup, 

599 it's also possible to fetch it here, which will be called when XMPP clients 

600 of the user request the vcard, if it hasn't been fetched before 

601 :return: 

602 """ 

603 pass 

604 

605 def _make_presence( 

606 self, 

607 *, 

608 last_seen: Optional[datetime.datetime] = None, 

609 status_codes: Optional[set[int]] = None, 

610 user_full_jid: Optional[JID] = None, 

611 **presence_kwargs, 

612 ): 

613 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

614 caps = self.xmpp.plugin["xep_0115"] 

615 if p.get_from().resource and self._caps_ver: 

616 p["caps"]["node"] = caps.caps_node 

617 p["caps"]["hash"] = caps.hash 

618 p["caps"]["ver"] = self._caps_ver 

619 return p 

620 

621 @classmethod 

622 def from_store(cls, session, stored: Contact, *args, **kwargs) -> Self: 

623 contact = cls( 

624 session, 

625 cls.xmpp.LEGACY_CONTACT_ID_TYPE(stored.legacy_id), 

626 stored.jid.username, # type: ignore 

627 *args, # type: ignore 

628 **kwargs, # type: ignore 

629 ) 

630 contact.contact_pk = stored.id 

631 contact._name = stored.nick 

632 contact._is_friend = stored.is_friend 

633 contact._added_to_roster = stored.added_to_roster 

634 if (data := stored.extra_attributes) is not None: 

635 contact.deserialize_extra_attributes(data) 

636 contact._caps_ver = stored.caps_ver 

637 contact._set_logger_name() 

638 contact._AvatarMixin__avatar_unique_id = ( # type:ignore 

639 None 

640 if stored.avatar_legacy_id is None 

641 else session.xmpp.AVATAR_ID_TYPE(stored.avatar_legacy_id) 

642 ) 

643 contact._avatar_pk = stored.avatar_id 

644 contact._vcard = stored.vcard 

645 contact._vcard_fetched = stored.vcard_fetched 

646 contact._client_type = stored.client_type 

647 return contact 

648 

649 

650def is_markable(stanza: Union[Message, Presence]): 

651 if isinstance(stanza, Presence): 

652 return False 

653 return bool(stanza["body"]) 

654 

655 

656log = logging.getLogger(__name__)