Coverage for slidge / contact / contact.py: 86%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1import datetime 

2import logging 

3import warnings 

4from collections.abc import Iterable, Iterator, Sequence 

5from datetime import date 

6from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal 

7from xml.etree import ElementTree as ET 

8 

9from slixmpp import JID, Message, Presence 

10from slixmpp.exceptions import IqError, IqTimeout 

11from slixmpp.plugins.xep_0292.stanza import VCard4 

12from slixmpp.types import MessageTypes 

13 

14from slidge.db.avatar import CachedAvatar 

15 

16from ..core.mixins import AvatarMixin, FullCarbonMixin 

17from ..core.mixins.disco import ContactAccountDiscoMixin 

18from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

19from ..db.models import Contact, ContactSent 

20from ..util import SubclassableOnce 

21from ..util.types import ( 

22 AnyContact, 

23 AnySession, 

24 ClientType, 

25 LegacyUserIdType, 

26 MessageOrPresenceTypeVar, 

27) 

28 

29if TYPE_CHECKING: 

30 from ..command.base import ContactCommand 

31 from ..group.participant import LegacyParticipant 

32 

33 

34class LegacyContact( 

35 Generic[LegacyUserIdType], 

36 AvatarMixin, 

37 ContactAccountDiscoMixin, 

38 FullCarbonMixin, 

39 ReactionRecipientMixin, 

40 ThreadRecipientMixin, 

41 SubclassableOnce, 

42): 

43 """ 

44 This class centralizes actions in relation to a specific legacy contact. 

45 

46 You shouldn't create instances of contacts manually, but rather rely on 

47 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

48 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

49 through the :attr:`.BaseSession.contacts` attribute. 

50 

51 Typically, your plugin should have methods hook to the legacy events and 

52 call appropriate methods here to transmit the "legacy action" to the xmpp 

53 user. This should look like this: 

54 

55 .. code-block:python 

56 

57 class Session(BaseSession): 

58 ... 

59 

60 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

61 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

62 contact.send_text(legacy_msg_event.text) 

63 

64 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

65 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

66 contact.composing() 

67 ... 

68 

69 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

70 the user TO the contact, typically when the user uses an official client to 

71 do an action such as sending a message or marking as message as read. 

72 This will use :xep:`0363` to impersonate the XMPP user in order. 

73 """ 

74 

75 session: AnySession 

76 

77 RESOURCE: str = "slidge" 

78 """ 

79 A full JID, including a resource part is required for chat states (and maybe other stuff) 

80 to work properly. This is the name of the resource the contacts will use. 

81 """ 

82 PROPAGATE_PRESENCE_TO_GROUPS = True 

83 

84 mtype: MessageTypes = "chat" 

85 _can_send_carbon = True 

86 is_participant: Literal[False] = False 

87 is_group: Literal[False] = False 

88 

89 _ONLY_SEND_PRESENCE_CHANGES = True 

90 

91 STRIP_SHORT_DELAY = True 

92 _NON_FRIEND_PRESENCES_FILTER: ClassVar[set[str]] = {"subscribe", "unsubscribed"} 

93 

94 INVITATION_RECIPIENT = True 

95 

96 commands: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {} 

97 commands_chat: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {} 

98 

99 stored: Contact 

100 model: Contact 

101 

102 def __init__(self, session: AnySession, stored: Contact) -> None: 

103 self.session = session 

104 self.xmpp = session.xmpp 

105 self.stored = stored 

106 self._set_logger() 

107 super().__init__() 

108 

109 @property 

110 def jid(self) -> JID: 

111 jid = JID(self.stored.jid) 

112 jid.resource = self.RESOURCE 

113 return jid 

114 

115 @jid.setter 

116 def jid(self, _jid: JID) -> None: 

117 raise RuntimeError 

118 

119 @property 

120 def legacy_id(self) -> LegacyUserIdType: 

121 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) # type:ignore[no-any-return] 

122 

123 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

124 if fetch and not self.stored.vcard_fetched: 

125 await self.fetch_vcard() 

126 if self.stored.vcard is None: 

127 return None 

128 

129 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

130 

131 @property 

132 def is_friend(self) -> bool: 

133 return self.stored.is_friend 

134 

135 @is_friend.setter 

136 def is_friend(self, value: bool) -> None: 

137 if value == self.is_friend: 

138 return 

139 self.update_stored_attribute(is_friend=value) 

140 

141 @property 

142 def added_to_roster(self) -> bool: 

143 return self.stored.added_to_roster 

144 

145 @added_to_roster.setter 

146 def added_to_roster(self, value: bool) -> None: 

147 if value == self.added_to_roster: 

148 return 

149 self.update_stored_attribute(added_to_roster=value) 

150 

151 @property 

152 def participants(self) -> Iterator["LegacyParticipant"]: 

153 with self.xmpp.store.session() as orm: 

154 self.stored = orm.merge(self.stored) 

155 participants = self.stored.participants 

156 for p in participants: 

157 with self.xmpp.store.session() as orm: 

158 p = orm.merge(p) 

159 muc = self.session.bookmarks.from_store(p.room) 

160 yield muc.participant_from_store(p, contact=self) 

161 

162 @property 

163 def user_jid(self) -> JID: 

164 return self.session.user_jid 

165 

166 @property # type:ignore 

167 def DISCO_TYPE(self) -> ClientType: 

168 return self.client_type 

169 

170 @DISCO_TYPE.setter 

171 def DISCO_TYPE(self, value: ClientType) -> None: 

172 self.client_type = value 

173 

174 @property 

175 def client_type(self) -> ClientType: 

176 """ 

177 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

178 

179 Default is "pc". 

180 """ 

181 return self.stored.client_type 

182 

183 @client_type.setter 

184 def client_type(self, value: ClientType) -> None: 

185 if self.stored.client_type == value: 

186 return 

187 self.update_stored_attribute(client_type=value) 

188 

189 def _set_logger(self) -> None: 

190 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

191 

192 def __repr__(self) -> str: 

193 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

194 

195 def __get_subscription_string(self) -> str: 

196 if self.is_friend: 

197 return "both" 

198 return "none" 

199 

200 def __propagate_to_participants(self, stanza: Presence) -> None: 

201 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

202 return 

203 

204 ptype = stanza["type"] 

205 if ptype in ("available", "chat"): 

206 func_name = "online" 

207 elif ptype in ("xa", "unavailable"): 

208 # we map unavailable to extended_away, because offline is 

209 # "participant leaves the MUC" 

210 # TODO: improve this with a clear distinction between participant 

211 # and member list 

212 func_name = "extended_away" 

213 elif ptype == "busy": 

214 func_name = "busy" 

215 elif ptype == "away": 

216 func_name = "away" 

217 else: 

218 return 

219 

220 last_seen: datetime.datetime | None = ( 

221 stanza["idle"]["since"] if "idle" in stanza else None 

222 ) 

223 

224 kw = dict(status=stanza["status"], last_seen=last_seen) 

225 

226 for part in self.participants: 

227 func = getattr(part, func_name) 

228 func(**kw) 

229 

230 def _send( 

231 self, 

232 stanza: MessageOrPresenceTypeVar, 

233 carbon: bool = False, 

234 nick: bool = False, 

235 **send_kwargs: Any, # noqa:ANN401 

236 ) -> MessageOrPresenceTypeVar: 

237 if carbon and isinstance(stanza, Message): 

238 stanza["to"] = self.jid.bare 

239 stanza["from"] = self.user_jid 

240 self._privileged_send(stanza) 

241 return stanza 

242 

243 if isinstance(stanza, Presence): 

244 if not self._updating_info: 

245 self.__propagate_to_participants(stanza) 

246 if ( 

247 not self.is_friend 

248 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

249 ): 

250 return stanza 

251 if self.name and (nick or not self.is_friend): 

252 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

253 n["nick"] = self.name 

254 stanza.append(n) 

255 if ( 

256 not self._updating_info 

257 and self.xmpp.MARK_ALL_MESSAGES 

258 and is_markable(stanza) 

259 ): 

260 with self.xmpp.store.session(expire_on_commit=False) as orm: 

261 self.stored = orm.merge(self.stored) 

262 exists = ( 

263 orm.query(ContactSent) 

264 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"]) 

265 .first() 

266 ) 

267 if exists: 

268 self.log.warning( 

269 "Contact has already sent message %s", stanza["id"] 

270 ) 

271 else: 

272 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

273 orm.add(new) 

274 self.stored.sent_order.append(new) 

275 orm.commit() 

276 stanza["to"] = self.user_jid 

277 stanza.send() 

278 return stanza 

279 

280 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]: 

281 """ 

282 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

283 

284 Legacy modules have no reason to use this, but it is used by slidge core 

285 for legacy networks that need to mark all messages as read (most XMPP 

286 clients only send a read marker for the latest message). 

287 

288 This has side effects, if the horizon XMPP id is found, messages up to 

289 this horizon are cleared, to avoid sending the same read mark twice. 

290 

291 :param horizon_xmpp_id: The latest message 

292 :return: A list of XMPP ids up to horizon_xmpp_id, included 

293 """ 

294 with self.xmpp.store.session() as orm: 

295 assert self.stored.id is not None 

296 ids = self.xmpp.store.contacts.pop_sent_up_to( 

297 orm, self.stored.id, horizon_xmpp_id 

298 ) 

299 orm.commit() 

300 return ids 

301 

302 @property 

303 def name(self) -> str: 

304 """ 

305 Friendly name of the contact, as it should appear in the user's roster 

306 """ 

307 return self.stored.nick or "" 

308 

309 @name.setter 

310 def name(self, n: str | None) -> None: 

311 if self.stored.nick == n: 

312 return 

313 self.update_stored_attribute(nick=n) 

314 self._set_logger() 

315 if self.is_friend and self.added_to_roster: 

316 self.xmpp.pubsub.broadcast_nick( 

317 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

318 ) 

319 for p in self.participants: 

320 p.nickname = n or str(self.legacy_id) 

321 

322 def _post_avatar_update(self, cached_avatar: CachedAvatar | None) -> None: 

323 if self.is_friend and self.added_to_roster: 

324 self.session.create_task( 

325 self.session.xmpp.pubsub.broadcast_avatar( 

326 self.jid.bare, self.session.user_jid, cached_avatar 

327 ) 

328 ) 

329 for p in self.participants: 

330 self.log.debug("Propagating new avatar to %s", p.muc) 

331 p.send_last_presence(force=True, no_cache_online=True) 

332 

333 def set_vcard( 

334 self, 

335 /, 

336 full_name: str | None = None, 

337 given: str | None = None, 

338 surname: str | None = None, 

339 birthday: date | None = None, 

340 phone: str | None = None, 

341 phones: Iterable[str] = (), 

342 note: str | None = None, 

343 url: str | None = None, 

344 email: str | None = None, 

345 country: str | None = None, 

346 locality: str | None = None, 

347 pronouns: str | None = None, 

348 ) -> None: 

349 """ 

350 Update xep:`0292` data for this contact. 

351 

352 Use this for additional metadata about this contact to be available to XMPP 

353 clients. The "note" argument is a text of arbitrary size and can be useful when 

354 no other field is a good fit. 

355 """ 

356 vcard = VCard4() 

357 vcard.add_impp(f"xmpp:{self.jid.bare}") 

358 

359 if n := self.name: 

360 vcard.add_nickname(n) 

361 if full_name: 

362 vcard["full_name"] = full_name 

363 elif n: 

364 vcard["full_name"] = n 

365 

366 if given: 

367 vcard["given"] = given 

368 if surname: 

369 vcard["surname"] = surname 

370 if birthday: 

371 vcard["birthday"] = birthday 

372 

373 if note: 

374 vcard.add_note(note) 

375 if url: 

376 vcard.add_url(url) 

377 if email: 

378 vcard.add_email(email) 

379 if phone: 

380 vcard.add_tel(phone) 

381 for p in phones: 

382 vcard.add_tel(p) 

383 if (country and locality) or country: 

384 vcard.add_address(country, locality) 

385 if pronouns: 

386 vcard["pronouns"]["text"] = pronouns 

387 

388 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True) 

389 self.session.create_task( 

390 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

391 ) 

392 

393 def get_roster_item(self) -> dict[str, dict[str, str | Sequence[str]]]: 

394 item = { 

395 "subscription": self.__get_subscription_string(), 

396 "groups": [self.xmpp.ROSTER_GROUP], 

397 } 

398 if (n := self.name) is not None: 

399 item["name"] = n 

400 return {self.jid.bare: item} 

401 

402 async def add_to_roster(self, force: bool = False) -> None: 

403 """ 

404 Add this contact to the user roster using :xep:`0356` 

405 

406 :param force: add even if the contact was already added successfully 

407 """ 

408 if self.added_to_roster and not force: 

409 return 

410 if not self.session.user.preferences.get("roster_push", True): 

411 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

412 return 

413 try: 

414 await self.xmpp["xep_0356"].set_roster( 

415 jid=self.user_jid, roster_items=self.get_roster_item() 

416 ) 

417 except PermissionError: 

418 warnings.warn( 

419 "Slidge does not have the privilege (XEP-0356) to manage rosters. " 

420 "Consider configuring your XMPP server for that." 

421 ) 

422 self.send_friend_request( 

423 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

424 "slidge is not allowed to manage your roster." 

425 ) 

426 return 

427 except (IqError, IqTimeout) as e: 

428 self.log.warning("Could not add to roster", exc_info=e) 

429 else: 

430 # we only broadcast pubsub events for contacts added to the roster 

431 # so if something was set before, we need to push it now 

432 self.added_to_roster = True 

433 self.send_last_presence(force=True) 

434 

435 async def __broadcast_pubsub_items(self) -> None: 

436 if not self.is_friend: 

437 return 

438 if not self.added_to_roster: 

439 return 

440 cached_avatar = self.get_cached_avatar() 

441 if cached_avatar is not None: 

442 await self.xmpp.pubsub.broadcast_avatar( 

443 self.jid.bare, self.session.user_jid, cached_avatar 

444 ) 

445 nick = self.name 

446 

447 if nick is not None: 

448 self.xmpp.pubsub.broadcast_nick( 

449 self.session.user_jid, 

450 self.jid.bare, 

451 nick, 

452 ) 

453 

454 def send_friend_request(self, text: str | None = None) -> None: 

455 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

456 self._send(presence, nick=True) 

457 

458 async def accept_friend_request(self, text: str | None = None) -> None: 

459 """ 

460 Call this to signify that this Contact has accepted to be a friend 

461 of the user. 

462 

463 :param text: Optional message from the friend to the user 

464 """ 

465 self.is_friend = True 

466 self.added_to_roster = True 

467 self.log.debug("Accepting friend request") 

468 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

469 self._send(presence, nick=True) 

470 self.send_last_presence() 

471 await self.__broadcast_pubsub_items() 

472 self.log.debug("Accepted friend request") 

473 

474 def reject_friend_request(self, text: str | None = None) -> None: 

475 """ 

476 Call this to signify that this Contact has refused to be a contact 

477 of the user (or that they don't want to be friends anymore) 

478 

479 :param text: Optional message from the non-friend to the user 

480 """ 

481 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

482 self.offline() 

483 self._send(presence, nick=True) 

484 self.is_friend = False 

485 

486 async def on_friend_request(self, text: str = "") -> None: 

487 """ 

488 Called when receiving a "subscribe" presence, ie, "I would like to add 

489 you to my contacts/friends", from the user to this contact. 

490 

491 In XMPP terms: "I would like to receive your presence updates" 

492 

493 This is only called if self.is_friend = False. If self.is_friend = True, 

494 slidge will automatically "accept the friend request", ie, reply with 

495 a "subscribed" presence. 

496 

497 When called, a 'friend request event' should be sent to the legacy 

498 service, and when the contact responds, you should either call 

499 self.accept_subscription() or self.reject_subscription() 

500 """ 

501 pass 

502 

503 async def on_friend_delete(self, text: str = "") -> None: 

504 """ 

505 Called when receiving an "unsubscribed" presence, ie, "I would like to 

506 remove you to my contacts/friends" or "I refuse your friend request" 

507 from the user to this contact. 

508 

509 In XMPP terms: "You won't receive my presence updates anymore (or you 

510 never have)". 

511 """ 

512 pass 

513 

514 async def on_friend_accept(self) -> None: 

515 """ 

516 Called when receiving a "subscribed" presence, ie, "I accept to be 

517 your/confirm that you are my friend" from the user to this contact. 

518 

519 In XMPP terms: "You will receive my presence updates". 

520 """ 

521 pass 

522 

523 def unsubscribe(self) -> None: 

524 """ 

525 (internal use by slidge) 

526 

527 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

528 from this contact to the user, ie, "this contact has removed you from 

529 their 'friends'". 

530 """ 

531 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

532 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

533 

534 async def update_info(self) -> None: 

535 """ 

536 Fetch information about this contact from the legacy network 

537 

538 This is awaited on Contact instantiation, and should be overridden to 

539 update the nickname, avatar, vcard [...] of this contact, by making 

540 "legacy API calls". 

541 

542 To take advantage of the slidge avatar cache, you can check the .avatar 

543 property to retrieve the "legacy file ID" of the cached avatar. If there 

544 is no change, you should not call 

545 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

546 to modify the ``.avatar`` property. 

547 """ 

548 pass 

549 

550 async def fetch_vcard(self) -> None: 

551 """ 

552 It the legacy network doesn't like that you fetch too many profiles on startup, 

553 it's also possible to fetch it here, which will be called when XMPP clients 

554 of the user request the vcard, if it hasn't been fetched before 

555 :return: 

556 """ 

557 pass 

558 

559 def _make_presence( 

560 self, 

561 *, 

562 last_seen: datetime.datetime | None = None, 

563 status_codes: set[int] | None = None, 

564 user_full_jid: JID | None = None, 

565 **presence_kwargs: Any, # noqa:ANN401 

566 ) -> Presence: 

567 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

568 caps = self.xmpp.plugin["xep_0115"] 

569 if p.get_from().resource and self.stored.caps_ver: 

570 p["caps"]["node"] = caps.caps_node 

571 p["caps"]["hash"] = caps.hash 

572 p["caps"]["ver"] = self.stored.caps_ver 

573 return p 

574 

575 

576def is_markable(stanza: Message | Presence) -> bool: 

577 if isinstance(stanza, Presence): 

578 return False 

579 return bool(stanza["body"]) 

580 

581 

582log = logging.getLogger(__name__)