Coverage for slidge / contact / contact.py: 86%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import datetime 

2import logging 

3import warnings 

4from collections.abc import Iterable, Iterator, Sequence 

5from datetime import date 

6from typing import TYPE_CHECKING, Any, ClassVar, Literal, Self 

7from xml.etree import ElementTree as ET 

8 

9from slixmpp import JID, Message, Presence 

10from slixmpp.exceptions import IqError, IqTimeout 

11from slixmpp.plugins.xep_0292.stanza import VCard4 

12from slixmpp.types import MessageTypes 

13 

14from slidge.db.avatar import CachedAvatar 

15 

16from ..core.mixins import AvatarMixin, FullCarbonMixin 

17from ..core.mixins.disco import ContactAccountDiscoMixin 

18from ..core.mixins.recipient import RecipientMixin 

19from ..db.models import Contact, ContactSent 

20from ..util import SubclassableOnce 

21from ..util.types import ( 

22 AnySession, 

23 ClientType, 

24 MessageOrPresenceTypeVar, 

25) 

26 

27if TYPE_CHECKING: 

28 from ..command.base import ContactCommand 

29 from ..group.participant import LegacyParticipant 

30 

31 

32class LegacyContact( 

33 AvatarMixin, 

34 ContactAccountDiscoMixin, 

35 FullCarbonMixin, 

36 RecipientMixin, 

37 SubclassableOnce, 

38): 

39 """ 

40 This class centralizes actions in relation to a specific legacy contact. 

41 

42 You shouldn't create instances of contacts manually, but rather rely on 

43 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

44 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

45 through the :attr:`.BaseSession.contacts` attribute. 

46 

47 Typically, your plugin should have methods hook to the legacy events and 

48 call appropriate methods here to transmit the "legacy action" to the xmpp 

49 user. This should look like this: 

50 

51 .. code-block:python 

52 

53 class Session(BaseSession): 

54 ... 

55 

56 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

57 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

58 contact.send_text(legacy_msg_event.text) 

59 

60 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

61 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

62 contact.composing() 

63 ... 

64 

65 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

66 the user TO the contact, typically when the user uses an official client to 

67 do an action such as sending a message or marking as message as read. 

68 This will use :xep:`0363` to impersonate the XMPP user in order. 

69 """ 

70 

71 session: AnySession 

72 

73 RESOURCE: str = "slidge" 

74 """ 

75 A full JID, including a resource part is required for chat states (and maybe other stuff) 

76 to work properly. This is the name of the resource the contacts will use. 

77 """ 

78 PROPAGATE_PRESENCE_TO_GROUPS = True 

79 

80 mtype: MessageTypes = "chat" 

81 _can_send_carbon = True 

82 is_participant: Literal[False] = False 

83 is_group: Literal[False] = False 

84 

85 _ONLY_SEND_PRESENCE_CHANGES = True 

86 

87 STRIP_SHORT_DELAY = True 

88 _NON_FRIEND_PRESENCES_FILTER: ClassVar[set[str]] = {"subscribe", "unsubscribed"} 

89 

90 INVITATION_RECIPIENT = True 

91 

92 commands: ClassVar[dict[str, "type[ContactCommand[LegacyContact]]"]] = {} 

93 commands_chat: ClassVar[dict[str, "type[ContactCommand[LegacyContact]]"]] = {} 

94 

95 stored: Contact 

96 model: Contact 

97 

98 def __init__(self, session: AnySession, stored: Contact) -> None: 

99 self.session = session 

100 self.xmpp = session.xmpp 

101 self.stored = stored 

102 self._set_logger() 

103 super().__init__() 

104 

105 @property 

106 def jid(self) -> JID: 

107 jid = JID(self.stored.jid) 

108 jid.resource = self.RESOURCE 

109 return jid 

110 

111 @jid.setter 

112 def jid(self, _jid: JID) -> None: 

113 raise RuntimeError 

114 

115 @property 

116 def legacy_id(self) -> str: 

117 return self.stored.legacy_id 

118 

119 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

120 if fetch and not self.stored.vcard_fetched: 

121 await self.fetch_vcard() 

122 if self.stored.vcard is None: 

123 return None 

124 

125 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

126 

127 @property 

128 def is_friend(self) -> bool: 

129 return self.stored.is_friend 

130 

131 @is_friend.setter 

132 def is_friend(self, value: bool) -> None: 

133 if value == self.is_friend: 

134 return 

135 self.update_stored_attribute(is_friend=value) 

136 

137 @property 

138 def added_to_roster(self) -> bool: 

139 return self.stored.added_to_roster 

140 

141 @added_to_roster.setter 

142 def added_to_roster(self, value: bool) -> None: 

143 if value == self.added_to_roster: 

144 return 

145 self.update_stored_attribute(added_to_roster=value) 

146 

147 @property 

148 def participants(self) -> Iterator["LegacyParticipant[Self]"]: 

149 with self.xmpp.store.session() as orm: 

150 self.stored = orm.merge(self.stored) 

151 participants = self.stored.participants 

152 for p in participants: 

153 with self.xmpp.store.session() as orm: 

154 p = orm.merge(p) 

155 muc = self.session.bookmarks.from_store(p.room) 

156 yield muc.participant_from_store(p, contact=self) 

157 

158 @property 

159 def user_jid(self) -> JID: 

160 return self.session.user_jid 

161 

162 @property # type:ignore 

163 def DISCO_TYPE(self) -> ClientType: 

164 return self.client_type 

165 

166 @DISCO_TYPE.setter 

167 def DISCO_TYPE(self, value: ClientType) -> None: 

168 self.client_type = value 

169 

170 @property 

171 def client_type(self) -> ClientType: 

172 """ 

173 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

174 

175 Default is "pc". 

176 """ 

177 return self.stored.client_type 

178 

179 @client_type.setter 

180 def client_type(self, value: ClientType) -> None: 

181 if self.stored.client_type == value: 

182 return 

183 self.update_stored_attribute(client_type=value) 

184 

185 def _set_logger(self) -> None: 

186 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

187 

188 def __repr__(self) -> str: 

189 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

190 

191 def __get_subscription_string(self) -> str: 

192 if self.is_friend: 

193 return "both" 

194 return "none" 

195 

196 def __propagate_to_participants(self, stanza: Presence) -> None: 

197 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

198 return 

199 

200 ptype = stanza["type"] 

201 if ptype in ("available", "chat"): 

202 func_name = "online" 

203 elif ptype in ("xa", "unavailable"): 

204 # we map unavailable to extended_away, because offline is 

205 # "participant leaves the MUC" 

206 # TODO: improve this with a clear distinction between participant 

207 # and member list 

208 func_name = "extended_away" 

209 elif ptype == "busy": 

210 func_name = "busy" 

211 elif ptype == "away": 

212 func_name = "away" 

213 else: 

214 return 

215 

216 last_seen: datetime.datetime | None = ( 

217 stanza["idle"]["since"] if "idle" in stanza else None 

218 ) 

219 

220 kw = dict(status=stanza["status"], last_seen=last_seen) 

221 

222 for part in self.participants: 

223 func = getattr(part, func_name) 

224 func(**kw) 

225 

226 def _send( 

227 self, 

228 stanza: MessageOrPresenceTypeVar, 

229 carbon: bool = False, 

230 nick: bool = False, 

231 **send_kwargs: Any, # noqa:ANN401 

232 ) -> MessageOrPresenceTypeVar: 

233 if carbon and isinstance(stanza, Message): 

234 stanza["to"] = self.jid.bare 

235 stanza["from"] = self.user_jid 

236 self._privileged_send(stanza) 

237 return stanza 

238 

239 if isinstance(stanza, Presence): 

240 if not self._updating_info: 

241 self.__propagate_to_participants(stanza) 

242 if ( 

243 not self.is_friend 

244 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

245 ): 

246 return stanza 

247 if self.name and (nick or not self.is_friend): 

248 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

249 n["nick"] = self.name 

250 stanza.append(n) 

251 if ( 

252 not self._updating_info 

253 and self.xmpp.MARK_ALL_MESSAGES 

254 and is_markable(stanza) 

255 ): 

256 with self.xmpp.store.session(expire_on_commit=False) as orm: 

257 self.stored = orm.merge(self.stored) 

258 exists = ( 

259 orm.query(ContactSent) 

260 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"]) 

261 .first() 

262 ) 

263 if exists: 

264 self.log.warning( 

265 "Contact has already sent message %s", stanza["id"] 

266 ) 

267 else: 

268 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

269 orm.add(new) 

270 self.stored.sent_order.append(new) 

271 orm.commit() 

272 stanza["to"] = self.user_jid 

273 stanza.send() 

274 return stanza 

275 

276 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]: 

277 """ 

278 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

279 

280 Legacy modules have no reason to use this, but it is used by slidge core 

281 for legacy networks that need to mark all messages as read (most XMPP 

282 clients only send a read marker for the latest message). 

283 

284 This has side effects, if the horizon XMPP id is found, messages up to 

285 this horizon are cleared, to avoid sending the same read mark twice. 

286 

287 :param horizon_xmpp_id: The latest message 

288 :return: A list of XMPP ids up to horizon_xmpp_id, included 

289 """ 

290 with self.xmpp.store.session() as orm: 

291 assert self.stored.id is not None 

292 ids = self.xmpp.store.contacts.pop_sent_up_to( 

293 orm, self.stored.id, horizon_xmpp_id 

294 ) 

295 orm.commit() 

296 return ids 

297 

298 @property 

299 def name(self) -> str: 

300 """ 

301 Friendly name of the contact, as it should appear in the user's roster 

302 """ 

303 return self.stored.nick or "" 

304 

305 @name.setter 

306 def name(self, n: str | None) -> None: 

307 if self.stored.nick == n: 

308 return 

309 self.update_stored_attribute(nick=n) 

310 self._set_logger() 

311 if self.is_friend and self.added_to_roster: 

312 self.xmpp.pubsub.broadcast_nick( 

313 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

314 ) 

315 for p in self.participants: 

316 p.nickname = n or str(self.legacy_id) 

317 

318 def _post_avatar_update(self, cached_avatar: CachedAvatar | None) -> None: 

319 if self.is_friend and self.added_to_roster: 

320 self.session.create_task( 

321 self.session.xmpp.pubsub.broadcast_avatar( 

322 self.jid.bare, self.session.user_jid, cached_avatar 

323 ) 

324 ) 

325 for p in self.participants: 

326 self.log.debug("Propagating new avatar to %s", p.muc) 

327 p.send_last_presence(force=True, no_cache_online=True) 

328 

329 def set_vcard( 

330 self, 

331 /, 

332 full_name: str | None = None, 

333 given: str | None = None, 

334 surname: str | None = None, 

335 birthday: date | None = None, 

336 phone: str | None = None, 

337 phones: Iterable[str] = (), 

338 note: str | None = None, 

339 url: str | None = None, 

340 email: str | None = None, 

341 country: str | None = None, 

342 locality: str | None = None, 

343 pronouns: str | None = None, 

344 ) -> None: 

345 """ 

346 Update xep:`0292` data for this contact. 

347 

348 Use this for additional metadata about this contact to be available to XMPP 

349 clients. The "note" argument is a text of arbitrary size and can be useful when 

350 no other field is a good fit. 

351 """ 

352 vcard = VCard4() 

353 vcard.add_impp(f"xmpp:{self.jid.bare}") 

354 

355 if n := self.name: 

356 vcard.add_nickname(n) 

357 if full_name: 

358 vcard["full_name"] = full_name 

359 elif n: 

360 vcard["full_name"] = n 

361 

362 if given: 

363 vcard["given"] = given 

364 if surname: 

365 vcard["surname"] = surname 

366 if birthday: 

367 vcard["birthday"] = birthday 

368 

369 if note: 

370 vcard.add_note(note) 

371 if url: 

372 vcard.add_url(url) 

373 if email: 

374 vcard.add_email(email) 

375 if phone: 

376 vcard.add_tel(phone) 

377 for p in phones: 

378 vcard.add_tel(p) 

379 if (country and locality) or country: 

380 vcard.add_address(country, locality) 

381 if pronouns: 

382 vcard["pronouns"]["text"] = pronouns 

383 

384 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True) 

385 self.session.create_task( 

386 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

387 ) 

388 

389 def get_roster_item(self) -> dict[str, dict[str, str | Sequence[str]]]: 

390 item = { 

391 "subscription": self.__get_subscription_string(), 

392 "groups": [self.xmpp.ROSTER_GROUP], 

393 } 

394 if (n := self.name) is not None: 

395 item["name"] = n 

396 return {self.jid.bare: item} 

397 

398 async def add_to_roster(self, force: bool = False) -> None: 

399 """ 

400 Add this contact to the user roster using :xep:`0356` 

401 

402 :param force: add even if the contact was already added successfully 

403 """ 

404 if self.added_to_roster and not force: 

405 return 

406 if not self.session.user.preferences.get("roster_push", True): 

407 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

408 return 

409 try: 

410 await self.xmpp["xep_0356"].set_roster( 

411 jid=self.user_jid, roster_items=self.get_roster_item() 

412 ) 

413 except PermissionError: 

414 warnings.warn( 

415 "Slidge does not have the privilege (XEP-0356) to manage rosters. " 

416 "Consider configuring your XMPP server for that." 

417 ) 

418 self.send_friend_request( 

419 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

420 "slidge is not allowed to manage your roster." 

421 ) 

422 return 

423 except (IqError, IqTimeout) as e: 

424 self.log.warning("Could not add to roster", exc_info=e) 

425 else: 

426 # we only broadcast pubsub events for contacts added to the roster 

427 # so if something was set before, we need to push it now 

428 self.added_to_roster = True 

429 self.send_last_presence(force=True) 

430 

431 async def __broadcast_pubsub_items(self) -> None: 

432 if not self.is_friend: 

433 return 

434 if not self.added_to_roster: 

435 return 

436 cached_avatar = self.get_cached_avatar() 

437 if cached_avatar is not None: 

438 await self.xmpp.pubsub.broadcast_avatar( 

439 self.jid.bare, self.session.user_jid, cached_avatar 

440 ) 

441 nick = self.name 

442 

443 if nick is not None: 

444 self.xmpp.pubsub.broadcast_nick( 

445 self.session.user_jid, 

446 self.jid.bare, 

447 nick, 

448 ) 

449 

450 def send_friend_request(self, text: str | None = None) -> None: 

451 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

452 self._send(presence, nick=True) 

453 

454 async def accept_friend_request(self, text: str | None = None) -> None: 

455 """ 

456 Call this to signify that this Contact has accepted to be a friend 

457 of the user. 

458 

459 :param text: Optional message from the friend to the user 

460 """ 

461 self.is_friend = True 

462 self.added_to_roster = True 

463 self.log.debug("Accepting friend request") 

464 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

465 self._send(presence, nick=True) 

466 self.send_last_presence() 

467 await self.__broadcast_pubsub_items() 

468 self.log.debug("Accepted friend request") 

469 

470 def reject_friend_request(self, text: str | None = None) -> None: 

471 """ 

472 Call this to signify that this Contact has refused to be a contact 

473 of the user (or that they don't want to be friends anymore) 

474 

475 :param text: Optional message from the non-friend to the user 

476 """ 

477 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

478 self.offline() 

479 self._send(presence, nick=True) 

480 self.is_friend = False 

481 

482 async def on_friend_request(self, text: str = "") -> None: 

483 """ 

484 Called when receiving a "subscribe" presence, ie, "I would like to add 

485 you to my contacts/friends", from the user to this contact. 

486 

487 In XMPP terms: "I would like to receive your presence updates" 

488 

489 This is only called if self.is_friend = False. If self.is_friend = True, 

490 slidge will automatically "accept the friend request", ie, reply with 

491 a "subscribed" presence. 

492 

493 When called, a 'friend request event' should be sent to the legacy 

494 service, and when the contact responds, you should either call 

495 self.accept_subscription() or self.reject_subscription() 

496 """ 

497 pass 

498 

499 async def on_friend_delete(self, text: str = "") -> None: 

500 """ 

501 Called when receiving an "unsubscribed" presence, ie, "I would like to 

502 remove you to my contacts/friends" or "I refuse your friend request" 

503 from the user to this contact. 

504 

505 In XMPP terms: "You won't receive my presence updates anymore (or you 

506 never have)". 

507 """ 

508 pass 

509 

510 async def on_friend_accept(self) -> None: 

511 """ 

512 Called when receiving a "subscribed" presence, ie, "I accept to be 

513 your/confirm that you are my friend" from the user to this contact. 

514 

515 In XMPP terms: "You will receive my presence updates". 

516 """ 

517 pass 

518 

519 def unsubscribe(self) -> None: 

520 """ 

521 (internal use by slidge) 

522 

523 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

524 from this contact to the user, ie, "this contact has removed you from 

525 their 'friends'". 

526 """ 

527 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

528 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

529 

530 async def update_info(self) -> None: 

531 """ 

532 Fetch information about this contact from the legacy network 

533 

534 This is awaited on Contact instantiation, and should be overridden to 

535 update the nickname, avatar, vcard [...] of this contact, by making 

536 "legacy API calls". 

537 

538 To take advantage of the slidge avatar cache, you can check the .avatar 

539 property to retrieve the "legacy file ID" of the cached avatar. If there 

540 is no change, you should not call 

541 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

542 to modify the ``.avatar`` property. 

543 """ 

544 pass 

545 

546 async def fetch_vcard(self) -> None: 

547 """ 

548 It the legacy network doesn't like that you fetch too many profiles on startup, 

549 it's also possible to fetch it here, which will be called when XMPP clients 

550 of the user request the vcard, if it hasn't been fetched before 

551 :return: 

552 """ 

553 pass 

554 

555 def _make_presence( 

556 self, 

557 *, 

558 last_seen: datetime.datetime | None = None, 

559 status_codes: set[int] | None = None, 

560 user_full_jid: JID | None = None, 

561 **presence_kwargs: Any, # noqa:ANN401 

562 ) -> Presence: 

563 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

564 caps = self.xmpp.plugin["xep_0115"] 

565 if p.get_from().resource and self.stored.caps_ver: 

566 p["caps"]["node"] = caps.caps_node 

567 p["caps"]["hash"] = caps.hash 

568 p["caps"]["ver"] = self.stored.caps_ver 

569 return p 

570 

571 

572def is_markable(stanza: Message | Presence) -> bool: 

573 if isinstance(stanza, Presence): 

574 return False 

575 return bool(stanza["body"]) 

576 

577 

578log = logging.getLogger(__name__)