Coverage for slidge / contact / contact.py: 86%

288 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import datetime 

2import logging 

3import warnings 

4from collections.abc import Iterable, Iterator, Sequence 

5from datetime import date 

6from typing import TYPE_CHECKING, Any, ClassVar, Generic 

7from xml.etree import ElementTree as ET 

8 

9from slixmpp import JID, Message, Presence 

10from slixmpp.exceptions import IqError, IqTimeout 

11from slixmpp.plugins.xep_0292.stanza import VCard4 

12from slixmpp.types import MessageTypes 

13 

14from slidge.db.avatar import CachedAvatar 

15 

16from ..core.mixins import AvatarMixin, FullCarbonMixin 

17from ..core.mixins.disco import ContactAccountDiscoMixin 

18from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

19from ..db.models import Contact, ContactSent 

20from ..util import SubclassableOnce 

21from ..util.types import ( 

22 AnyContact, 

23 AnySession, 

24 ClientType, 

25 LegacyUserIdType, 

26 MessageOrPresenceTypeVar, 

27) 

28 

29if TYPE_CHECKING: 

30 from ..command.base import ContactCommand 

31 from ..group.participant import LegacyParticipant 

32 

33 

34class LegacyContact( 

35 Generic[LegacyUserIdType], 

36 AvatarMixin, 

37 ContactAccountDiscoMixin, 

38 FullCarbonMixin, 

39 ReactionRecipientMixin, 

40 ThreadRecipientMixin, 

41 SubclassableOnce, 

42): 

43 """ 

44 This class centralizes actions in relation to a specific legacy contact. 

45 

46 You shouldn't create instances of contacts manually, but rather rely on 

47 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

48 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

49 through the :attr:`.BaseSession.contacts` attribute. 

50 

51 Typically, your plugin should have methods hook to the legacy events and 

52 call appropriate methods here to transmit the "legacy action" to the xmpp 

53 user. This should look like this: 

54 

55 .. code-block:python 

56 

57 class Session(BaseSession): 

58 ... 

59 

60 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

61 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

62 contact.send_text(legacy_msg_event.text) 

63 

64 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

65 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

66 contact.composing() 

67 ... 

68 

69 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

70 the user TO the contact, typically when the user uses an official client to 

71 do an action such as sending a message or marking as message as read. 

72 This will use :xep:`0363` to impersonate the XMPP user in order. 

73 """ 

74 

75 session: AnySession 

76 

77 RESOURCE: str = "slidge" 

78 """ 

79 A full JID, including a resource part is required for chat states (and maybe other stuff) 

80 to work properly. This is the name of the resource the contacts will use. 

81 """ 

82 PROPAGATE_PRESENCE_TO_GROUPS = True 

83 

84 mtype: MessageTypes = "chat" 

85 _can_send_carbon = True 

86 is_participant = False 

87 is_group = False 

88 

89 _ONLY_SEND_PRESENCE_CHANGES = True 

90 

91 STRIP_SHORT_DELAY = True 

92 _NON_FRIEND_PRESENCES_FILTER: ClassVar[set[str]] = {"subscribe", "unsubscribed"} 

93 

94 INVITATION_RECIPIENT = True 

95 

96 commands: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {} 

97 commands_chat: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {} 

98 

99 stored: Contact 

100 model: Contact 

101 

102 def __init__(self, session: AnySession, stored: Contact) -> None: 

103 self.session = session 

104 self.xmpp = session.xmpp 

105 self.stored = stored 

106 self._set_logger() 

107 super().__init__() 

108 

109 @property 

110 def jid(self) -> JID: 

111 jid = JID(self.stored.jid) 

112 jid.resource = self.RESOURCE 

113 return jid 

114 

115 @jid.setter 

116 def jid(self, _jid: JID) -> None: 

117 raise RuntimeError 

118 

119 @property 

120 def legacy_id(self) -> LegacyUserIdType: 

121 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) # type:ignore[no-any-return] 

122 

123 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

124 if fetch and not self.stored.vcard_fetched: 

125 await self.fetch_vcard() 

126 if self.stored.vcard is None: 

127 return None 

128 

129 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

130 

131 @property 

132 def is_friend(self) -> bool: 

133 return self.stored.is_friend 

134 

135 @is_friend.setter 

136 def is_friend(self, value: bool) -> None: 

137 if value == self.is_friend: 

138 return 

139 self.update_stored_attribute(is_friend=value) 

140 

141 @property 

142 def added_to_roster(self) -> bool: 

143 return self.stored.added_to_roster 

144 

145 @added_to_roster.setter 

146 def added_to_roster(self, value: bool) -> None: 

147 if value == self.added_to_roster: 

148 return 

149 self.update_stored_attribute(added_to_roster=value) 

150 

151 @property 

152 def participants(self) -> Iterator["LegacyParticipant"]: 

153 with self.xmpp.store.session() as orm: 

154 self.stored = orm.merge(self.stored) 

155 participants = self.stored.participants 

156 for p in participants: 

157 with self.xmpp.store.session() as orm: 

158 p = orm.merge(p) 

159 muc = self.session.bookmarks.from_store(p.room) 

160 yield muc.participant_from_store(p, contact=self) 

161 

162 @property 

163 def user_jid(self) -> JID: 

164 return self.session.user_jid 

165 

166 @property # type:ignore 

167 def DISCO_TYPE(self) -> ClientType: 

168 return self.client_type 

169 

170 @DISCO_TYPE.setter 

171 def DISCO_TYPE(self, value: ClientType) -> None: 

172 self.client_type = value 

173 

174 @property 

175 def client_type(self) -> ClientType: 

176 """ 

177 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

178 

179 Default is "pc". 

180 """ 

181 return self.stored.client_type 

182 

183 @client_type.setter 

184 def client_type(self, value: ClientType) -> None: 

185 if self.stored.client_type == value: 

186 return 

187 self.update_stored_attribute(client_type=value) 

188 

189 def _set_logger(self) -> None: 

190 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

191 

192 def __repr__(self) -> str: 

193 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

194 

195 def __get_subscription_string(self) -> str: 

196 if self.is_friend: 

197 return "both" 

198 return "none" 

199 

200 def __propagate_to_participants(self, stanza: Presence) -> None: 

201 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

202 return 

203 

204 ptype = stanza["type"] 

205 if ptype in ("available", "chat"): 

206 func_name = "online" 

207 elif ptype in ("xa", "unavailable"): 

208 # we map unavailable to extended_away, because offline is 

209 # "participant leaves the MUC" 

210 # TODO: improve this with a clear distinction between participant 

211 # and member list 

212 func_name = "extended_away" 

213 elif ptype == "busy": 

214 func_name = "busy" 

215 elif ptype == "away": 

216 func_name = "away" 

217 else: 

218 return 

219 

220 last_seen: datetime.datetime | None = ( 

221 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None 

222 ) 

223 

224 kw = dict(status=stanza["status"], last_seen=last_seen) 

225 

226 for part in self.participants: 

227 func = getattr(part, func_name) 

228 func(**kw) 

229 

230 def _send( 

231 self, 

232 stanza: MessageOrPresenceTypeVar, 

233 carbon: bool = False, 

234 nick: bool = False, 

235 **send_kwargs: Any, # noqa:ANN401 

236 ) -> MessageOrPresenceTypeVar: 

237 if carbon and isinstance(stanza, Message): 

238 stanza["to"] = self.jid.bare 

239 stanza["from"] = self.user_jid 

240 self._privileged_send(stanza) 

241 return stanza 

242 

243 if isinstance(stanza, Presence): 

244 if not self._updating_info: 

245 self.__propagate_to_participants(stanza) 

246 if ( 

247 not self.is_friend 

248 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

249 ): 

250 return stanza 

251 if self.name and (nick or not self.is_friend): 

252 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

253 n["nick"] = self.name 

254 stanza.append(n) 

255 if ( 

256 not self._updating_info 

257 and self.xmpp.MARK_ALL_MESSAGES 

258 and is_markable(stanza) 

259 ): 

260 with self.xmpp.store.session(expire_on_commit=False) as orm: 

261 self.stored = orm.merge(self.stored) 

262 exists = ( 

263 orm.query(ContactSent) 

264 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"]) 

265 .first() 

266 ) 

267 if exists: 

268 self.log.warning( 

269 "Contact has already sent message %s", stanza["id"] 

270 ) 

271 else: 

272 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

273 orm.add(new) 

274 self.stored.sent_order.append(new) 

275 orm.commit() 

276 stanza["to"] = self.user_jid 

277 stanza.send() 

278 return stanza 

279 

280 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]: 

281 """ 

282 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

283 

284 Legacy modules have no reason to use this, but it is used by slidge core 

285 for legacy networks that need to mark all messages as read (most XMPP 

286 clients only send a read marker for the latest message). 

287 

288 This has side effects, if the horizon XMPP id is found, messages up to 

289 this horizon are cleared, to avoid sending the same read mark twice. 

290 

291 :param horizon_xmpp_id: The latest message 

292 :return: A list of XMPP ids up to horizon_xmpp_id, included 

293 """ 

294 with self.xmpp.store.session() as orm: 

295 assert self.stored.id is not None 

296 ids = self.xmpp.store.contacts.pop_sent_up_to( 

297 orm, self.stored.id, horizon_xmpp_id 

298 ) 

299 orm.commit() 

300 return ids 

301 

302 @property 

303 def name(self) -> str: 

304 """ 

305 Friendly name of the contact, as it should appear in the user's roster 

306 """ 

307 return self.stored.nick or "" 

308 

309 @name.setter 

310 def name(self, n: str | None) -> None: 

311 if self.stored.nick == n: 

312 return 

313 self.update_stored_attribute(nick=n) 

314 self._set_logger() 

315 if self.is_friend and self.added_to_roster: 

316 self.xmpp.pubsub.broadcast_nick( 

317 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

318 ) 

319 for p in self.participants: 

320 p.nickname = n or str(self.legacy_id) 

321 

322 def _post_avatar_update(self, cached_avatar: CachedAvatar | None) -> None: 

323 if self.is_friend and self.added_to_roster: 

324 self.session.create_task( 

325 self.session.xmpp.pubsub.broadcast_avatar( 

326 self.jid.bare, self.session.user_jid, cached_avatar 

327 ) 

328 ) 

329 for p in self.participants: 

330 self.log.debug("Propagating new avatar to %s", p.muc) 

331 p.send_last_presence(force=True, no_cache_online=True) 

332 

333 def set_vcard( 

334 self, 

335 /, 

336 full_name: str | None = None, 

337 given: str | None = None, 

338 surname: str | None = None, 

339 birthday: date | None = None, 

340 phone: str | None = None, 

341 phones: Iterable[str] = (), 

342 note: str | None = None, 

343 url: str | None = None, 

344 email: str | None = None, 

345 country: str | None = None, 

346 locality: str | None = None, 

347 pronouns: str | None = None, 

348 ) -> None: 

349 """ 

350 Update xep:`0292` data for this contact. 

351 

352 Use this for additional metadata about this contact to be available to XMPP 

353 clients. The "note" argument is a text of arbitrary size and can be useful when 

354 no other field is a good fit. 

355 """ 

356 vcard = VCard4() 

357 vcard.add_impp(f"xmpp:{self.jid.bare}") 

358 

359 if n := self.name: 

360 vcard.add_nickname(n) 

361 if full_name: 

362 vcard["full_name"] = full_name 

363 elif n: 

364 vcard["full_name"] = n 

365 

366 if given: 

367 vcard["given"] = given 

368 if surname: 

369 vcard["surname"] = surname 

370 if birthday: 

371 vcard["birthday"] = birthday 

372 

373 if note: 

374 vcard.add_note(note) 

375 if url: 

376 vcard.add_url(url) 

377 if email: 

378 vcard.add_email(email) 

379 if phone: 

380 vcard.add_tel(phone) 

381 for p in phones: 

382 vcard.add_tel(p) 

383 if country and locality: 

384 vcard.add_address(country, locality) 

385 elif country: 

386 vcard.add_address(country, locality) 

387 if pronouns: 

388 vcard["pronouns"]["text"] = pronouns 

389 

390 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True) 

391 self.session.create_task( 

392 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

393 ) 

394 

395 def get_roster_item(self) -> dict[str, dict[str, str | Sequence[str]]]: 

396 item = { 

397 "subscription": self.__get_subscription_string(), 

398 "groups": [self.xmpp.ROSTER_GROUP], 

399 } 

400 if (n := self.name) is not None: 

401 item["name"] = n 

402 return {self.jid.bare: item} 

403 

404 async def add_to_roster(self, force: bool = False) -> None: 

405 """ 

406 Add this contact to the user roster using :xep:`0356` 

407 

408 :param force: add even if the contact was already added successfully 

409 """ 

410 if self.added_to_roster and not force: 

411 return 

412 if not self.session.user.preferences.get("roster_push", True): 

413 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

414 return 

415 try: 

416 await self.xmpp["xep_0356"].set_roster( 

417 jid=self.user_jid, roster_items=self.get_roster_item() 

418 ) 

419 except PermissionError: 

420 warnings.warn( 

421 "Slidge does not have the privilege (XEP-0356) to manage rosters. " 

422 "Consider configuring your XMPP server for that." 

423 ) 

424 self.send_friend_request( 

425 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

426 "slidge is not allowed to manage your roster." 

427 ) 

428 return 

429 except (IqError, IqTimeout) as e: 

430 self.log.warning("Could not add to roster", exc_info=e) 

431 else: 

432 # we only broadcast pubsub events for contacts added to the roster 

433 # so if something was set before, we need to push it now 

434 self.added_to_roster = True 

435 self.send_last_presence(force=True) 

436 

437 async def __broadcast_pubsub_items(self) -> None: 

438 if not self.is_friend: 

439 return 

440 if not self.added_to_roster: 

441 return 

442 cached_avatar = self.get_cached_avatar() 

443 if cached_avatar is not None: 

444 await self.xmpp.pubsub.broadcast_avatar( 

445 self.jid.bare, self.session.user_jid, cached_avatar 

446 ) 

447 nick = self.name 

448 

449 if nick is not None: 

450 self.xmpp.pubsub.broadcast_nick( 

451 self.session.user_jid, 

452 self.jid.bare, 

453 nick, 

454 ) 

455 

456 def send_friend_request(self, text: str | None = None) -> None: 

457 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

458 self._send(presence, nick=True) 

459 

460 async def accept_friend_request(self, text: str | None = None) -> None: 

461 """ 

462 Call this to signify that this Contact has accepted to be a friend 

463 of the user. 

464 

465 :param text: Optional message from the friend to the user 

466 """ 

467 self.is_friend = True 

468 self.added_to_roster = True 

469 self.log.debug("Accepting friend request") 

470 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

471 self._send(presence, nick=True) 

472 self.send_last_presence() 

473 await self.__broadcast_pubsub_items() 

474 self.log.debug("Accepted friend request") 

475 

476 def reject_friend_request(self, text: str | None = None) -> None: 

477 """ 

478 Call this to signify that this Contact has refused to be a contact 

479 of the user (or that they don't want to be friends anymore) 

480 

481 :param text: Optional message from the non-friend to the user 

482 """ 

483 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

484 self.offline() 

485 self._send(presence, nick=True) 

486 self.is_friend = False 

487 

488 async def on_friend_request(self, text: str = "") -> None: 

489 """ 

490 Called when receiving a "subscribe" presence, ie, "I would like to add 

491 you to my contacts/friends", from the user to this contact. 

492 

493 In XMPP terms: "I would like to receive your presence updates" 

494 

495 This is only called if self.is_friend = False. If self.is_friend = True, 

496 slidge will automatically "accept the friend request", ie, reply with 

497 a "subscribed" presence. 

498 

499 When called, a 'friend request event' should be sent to the legacy 

500 service, and when the contact responds, you should either call 

501 self.accept_subscription() or self.reject_subscription() 

502 """ 

503 pass 

504 

505 async def on_friend_delete(self, text: str = "") -> None: 

506 """ 

507 Called when receiving an "unsubscribed" presence, ie, "I would like to 

508 remove you to my contacts/friends" or "I refuse your friend request" 

509 from the user to this contact. 

510 

511 In XMPP terms: "You won't receive my presence updates anymore (or you 

512 never have)". 

513 """ 

514 pass 

515 

516 async def on_friend_accept(self) -> None: 

517 """ 

518 Called when receiving a "subscribed" presence, ie, "I accept to be 

519 your/confirm that you are my friend" from the user to this contact. 

520 

521 In XMPP terms: "You will receive my presence updates". 

522 """ 

523 pass 

524 

525 def unsubscribe(self) -> None: 

526 """ 

527 (internal use by slidge) 

528 

529 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

530 from this contact to the user, ie, "this contact has removed you from 

531 their 'friends'". 

532 """ 

533 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

534 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

535 

536 async def update_info(self) -> None: 

537 """ 

538 Fetch information about this contact from the legacy network 

539 

540 This is awaited on Contact instantiation, and should be overridden to 

541 update the nickname, avatar, vcard [...] of this contact, by making 

542 "legacy API calls". 

543 

544 To take advantage of the slidge avatar cache, you can check the .avatar 

545 property to retrieve the "legacy file ID" of the cached avatar. If there 

546 is no change, you should not call 

547 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

548 to modify the ``.avatar`` property. 

549 """ 

550 pass 

551 

552 async def fetch_vcard(self) -> None: 

553 """ 

554 It the legacy network doesn't like that you fetch too many profiles on startup, 

555 it's also possible to fetch it here, which will be called when XMPP clients 

556 of the user request the vcard, if it hasn't been fetched before 

557 :return: 

558 """ 

559 pass 

560 

561 def _make_presence( 

562 self, 

563 *, 

564 last_seen: datetime.datetime | None = None, 

565 status_codes: set[int] | None = None, 

566 user_full_jid: JID | None = None, 

567 **presence_kwargs: Any, # noqa:ANN401 

568 ) -> Presence: 

569 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

570 caps = self.xmpp.plugin["xep_0115"] 

571 if p.get_from().resource and self.stored.caps_ver: 

572 p["caps"]["node"] = caps.caps_node 

573 p["caps"]["hash"] = caps.hash 

574 p["caps"]["ver"] = self.stored.caps_ver 

575 return p 

576 

577 

578def is_markable(stanza: Message | Presence) -> bool: 

579 if isinstance(stanza, Presence): 

580 return False 

581 return bool(stanza["body"]) 

582 

583 

584log = logging.getLogger(__name__)