Coverage for slidge / contact / contact.py: 86%

282 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1import datetime 

2import logging 

3import warnings 

4from collections.abc import Iterable, Iterator 

5from datetime import date 

6from typing import TYPE_CHECKING, Generic 

7from xml.etree import ElementTree as ET 

8 

9from slixmpp import JID, Message, Presence 

10from slixmpp.exceptions import IqError, IqTimeout 

11from slixmpp.plugins.xep_0292.stanza import VCard4 

12from slixmpp.types import MessageTypes 

13 

14from ..core.mixins import AvatarMixin, FullCarbonMixin 

15from ..core.mixins.disco import ContactAccountDiscoMixin 

16from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

17from ..db.models import Contact, ContactSent 

18from ..util import SubclassableOnce 

19from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar 

20 

21if TYPE_CHECKING: 

22 from ..core.session import BaseSession 

23 from ..group.participant import LegacyParticipant 

24 

25 

26class LegacyContact( 

27 Generic[LegacyUserIdType], 

28 AvatarMixin, 

29 ContactAccountDiscoMixin, 

30 FullCarbonMixin, 

31 ReactionRecipientMixin, 

32 ThreadRecipientMixin, 

33 metaclass=SubclassableOnce, 

34): 

35 """ 

36 This class centralizes actions in relation to a specific legacy contact. 

37 

38 You shouldn't create instances of contacts manually, but rather rely on 

39 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

40 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

41 through the :attr:`.BaseSession.contacts` attribute. 

42 

43 Typically, your plugin should have methods hook to the legacy events and 

44 call appropriate methods here to transmit the "legacy action" to the xmpp 

45 user. This should look like this: 

46 

47 .. code-block:python 

48 

49 class Session(BaseSession): 

50 ... 

51 

52 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

53 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

54 contact.send_text(legacy_msg_event.text) 

55 

56 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

57 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

58 contact.composing() 

59 ... 

60 

61 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

62 the user TO the contact, typically when the user uses an official client to 

63 do an action such as sending a message or marking as message as read. 

64 This will use :xep:`0363` to impersonate the XMPP user in order. 

65 """ 

66 

67 session: "BaseSession" 

68 

69 RESOURCE: str = "slidge" 

70 """ 

71 A full JID, including a resource part is required for chat states (and maybe other stuff) 

72 to work properly. This is the name of the resource the contacts will use. 

73 """ 

74 PROPAGATE_PRESENCE_TO_GROUPS = True 

75 

76 mtype: MessageTypes = "chat" 

77 _can_send_carbon = True 

78 is_participant = False 

79 is_group = False 

80 

81 _ONLY_SEND_PRESENCE_CHANGES = True 

82 

83 STRIP_SHORT_DELAY = True 

84 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"} 

85 

86 INVITATION_RECIPIENT = True 

87 

88 stored: Contact 

89 model: Contact 

90 

91 def __init__(self, session: "BaseSession", stored: Contact) -> None: 

92 self.session = session 

93 self.xmpp = session.xmpp 

94 self.stored = stored 

95 self._set_logger() 

96 super().__init__() 

97 

98 @property 

99 def jid(self): # type:ignore[override] 

100 jid = JID(self.stored.jid) 

101 jid.resource = self.RESOURCE 

102 return jid 

103 

104 @property 

105 def legacy_id(self): 

106 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) 

107 

108 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

109 if fetch and not self.stored.vcard_fetched: 

110 await self.fetch_vcard() 

111 if self.stored.vcard is None: 

112 return None 

113 

114 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

115 

116 @property 

117 def is_friend(self) -> bool: 

118 return self.stored.is_friend 

119 

120 @is_friend.setter 

121 def is_friend(self, value: bool) -> None: 

122 if value == self.is_friend: 

123 return 

124 self.update_stored_attribute(is_friend=value) 

125 

126 @property 

127 def added_to_roster(self) -> bool: 

128 return self.stored.added_to_roster 

129 

130 @added_to_roster.setter 

131 def added_to_roster(self, value: bool) -> None: 

132 if value == self.added_to_roster: 

133 return 

134 self.update_stored_attribute(added_to_roster=value) 

135 

136 @property 

137 def participants(self) -> Iterator["LegacyParticipant"]: 

138 with self.xmpp.store.session() as orm: 

139 self.stored = orm.merge(self.stored) 

140 participants = self.stored.participants 

141 for p in participants: 

142 with self.xmpp.store.session() as orm: 

143 p = orm.merge(p) 

144 muc = self.session.bookmarks.from_store(p.room) 

145 yield muc.participant_from_store(p, contact=self) 

146 

147 @property 

148 def user_jid(self): 

149 return self.session.user_jid 

150 

151 @property # type:ignore 

152 def DISCO_TYPE(self) -> ClientType: 

153 return self.client_type 

154 

155 @DISCO_TYPE.setter 

156 def DISCO_TYPE(self, value: ClientType) -> None: 

157 self.client_type = value 

158 

159 @property 

160 def client_type(self) -> ClientType: 

161 """ 

162 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

163 

164 Default is "pc". 

165 """ 

166 return self.stored.client_type 

167 

168 @client_type.setter 

169 def client_type(self, value: ClientType) -> None: 

170 if self.stored.client_type == value: 

171 return 

172 self.update_stored_attribute(client_type=value) 

173 

174 def _set_logger(self) -> None: 

175 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

176 

177 def __repr__(self) -> str: 

178 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

179 

180 def __get_subscription_string(self) -> str: 

181 if self.is_friend: 

182 return "both" 

183 return "none" 

184 

185 def __propagate_to_participants(self, stanza: Presence) -> None: 

186 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

187 return 

188 

189 ptype = stanza["type"] 

190 if ptype in ("available", "chat"): 

191 func_name = "online" 

192 elif ptype in ("xa", "unavailable"): 

193 # we map unavailable to extended_away, because offline is 

194 # "participant leaves the MUC" 

195 # TODO: improve this with a clear distinction between participant 

196 # and member list 

197 func_name = "extended_away" 

198 elif ptype == "busy": 

199 func_name = "busy" 

200 elif ptype == "away": 

201 func_name = "away" 

202 else: 

203 return 

204 

205 last_seen: datetime.datetime | None = ( 

206 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None 

207 ) 

208 

209 kw = dict(status=stanza["status"], last_seen=last_seen) 

210 

211 for part in self.participants: 

212 func = getattr(part, func_name) 

213 func(**kw) 

214 

215 def _send( 

216 self, 

217 stanza: MessageOrPresenceTypeVar, 

218 carbon: bool = False, 

219 nick: bool = False, 

220 **send_kwargs, 

221 ) -> MessageOrPresenceTypeVar: 

222 if carbon and isinstance(stanza, Message): 

223 stanza["to"] = self.jid.bare 

224 stanza["from"] = self.user_jid 

225 self._privileged_send(stanza) 

226 return stanza # type:ignore 

227 

228 if isinstance(stanza, Presence): 

229 if not self._updating_info: 

230 self.__propagate_to_participants(stanza) 

231 if ( 

232 not self.is_friend 

233 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

234 ): 

235 return stanza # type:ignore 

236 if self.name and (nick or not self.is_friend): 

237 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

238 n["nick"] = self.name 

239 stanza.append(n) 

240 if ( 

241 not self._updating_info 

242 and self.xmpp.MARK_ALL_MESSAGES 

243 and is_markable(stanza) 

244 ): 

245 with self.xmpp.store.session(expire_on_commit=False) as orm: 

246 self.stored = orm.merge(self.stored) 

247 exists = ( 

248 orm.query(ContactSent) 

249 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"]) 

250 .first() 

251 ) 

252 if exists: 

253 self.log.warning( 

254 "Contact has already sent message %s", stanza["id"] 

255 ) 

256 else: 

257 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

258 orm.add(new) 

259 self.stored.sent_order.append(new) 

260 orm.commit() 

261 stanza["to"] = self.user_jid 

262 stanza.send() 

263 return stanza 

264 

265 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]: 

266 """ 

267 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

268 

269 Legacy modules have no reason to use this, but it is used by slidge core 

270 for legacy networks that need to mark all messages as read (most XMPP 

271 clients only send a read marker for the latest message). 

272 

273 This has side effects, if the horizon XMPP id is found, messages up to 

274 this horizon are cleared, to avoid sending the same read mark twice. 

275 

276 :param horizon_xmpp_id: The latest message 

277 :return: A list of XMPP ids up to horizon_xmpp_id, included 

278 """ 

279 with self.xmpp.store.session() as orm: 

280 assert self.stored.id is not None 

281 ids = self.xmpp.store.contacts.pop_sent_up_to( 

282 orm, self.stored.id, horizon_xmpp_id 

283 ) 

284 orm.commit() 

285 return ids 

286 

287 @property 

288 def name(self) -> str: 

289 """ 

290 Friendly name of the contact, as it should appear in the user's roster 

291 """ 

292 return self.stored.nick or "" 

293 

294 @name.setter 

295 def name(self, n: str | None) -> None: 

296 if self.stored.nick == n: 

297 return 

298 self.update_stored_attribute(nick=n) 

299 self._set_logger() 

300 if self.is_friend and self.added_to_roster: 

301 self.xmpp.pubsub.broadcast_nick( 

302 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

303 ) 

304 for p in self.participants: 

305 p.nickname = n or str(self.legacy_id) 

306 

307 def _post_avatar_update(self, cached_avatar) -> None: 

308 if self.is_friend and self.added_to_roster: 

309 self.session.create_task( 

310 self.session.xmpp.pubsub.broadcast_avatar( 

311 self.jid.bare, self.session.user_jid, cached_avatar 

312 ) 

313 ) 

314 for p in self.participants: 

315 self.log.debug("Propagating new avatar to %s", p.muc) 

316 p.send_last_presence(force=True, no_cache_online=True) 

317 

318 def set_vcard( 

319 self, 

320 /, 

321 full_name: str | None = None, 

322 given: str | None = None, 

323 surname: str | None = None, 

324 birthday: date | None = None, 

325 phone: str | None = None, 

326 phones: Iterable[str] = (), 

327 note: str | None = None, 

328 url: str | None = None, 

329 email: str | None = None, 

330 country: str | None = None, 

331 locality: str | None = None, 

332 pronouns: str | None = None, 

333 ) -> None: 

334 """ 

335 Update xep:`0292` data for this contact. 

336 

337 Use this for additional metadata about this contact to be available to XMPP 

338 clients. The "note" argument is a text of arbitrary size and can be useful when 

339 no other field is a good fit. 

340 """ 

341 vcard = VCard4() 

342 vcard.add_impp(f"xmpp:{self.jid.bare}") 

343 

344 if n := self.name: 

345 vcard.add_nickname(n) 

346 if full_name: 

347 vcard["full_name"] = full_name 

348 elif n: 

349 vcard["full_name"] = n 

350 

351 if given: 

352 vcard["given"] = given 

353 if surname: 

354 vcard["surname"] = surname 

355 if birthday: 

356 vcard["birthday"] = birthday 

357 

358 if note: 

359 vcard.add_note(note) 

360 if url: 

361 vcard.add_url(url) 

362 if email: 

363 vcard.add_email(email) 

364 if phone: 

365 vcard.add_tel(phone) 

366 for p in phones: 

367 vcard.add_tel(p) 

368 if country and locality: 

369 vcard.add_address(country, locality) 

370 elif country: 

371 vcard.add_address(country, locality) 

372 if pronouns: 

373 vcard["pronouns"]["text"] = pronouns 

374 

375 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True) 

376 self.session.create_task( 

377 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

378 ) 

379 

380 def get_roster_item(self): 

381 item = { 

382 "subscription": self.__get_subscription_string(), 

383 "groups": [self.xmpp.ROSTER_GROUP], 

384 } 

385 if (n := self.name) is not None: 

386 item["name"] = n 

387 return {self.jid.bare: item} 

388 

389 async def add_to_roster(self, force: bool = False) -> None: 

390 """ 

391 Add this contact to the user roster using :xep:`0356` 

392 

393 :param force: add even if the contact was already added successfully 

394 """ 

395 if self.added_to_roster and not force: 

396 return 

397 if not self.session.user.preferences.get("roster_push", True): 

398 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

399 return 

400 try: 

401 await self.xmpp["xep_0356"].set_roster( 

402 jid=self.user_jid, roster_items=self.get_roster_item() 

403 ) 

404 except PermissionError: 

405 warnings.warn( 

406 "Slidge does not have the privilege (XEP-0356) to manage rosters. " 

407 "Consider configuring your XMPP server for that." 

408 ) 

409 self.send_friend_request( 

410 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

411 "slidge is not allowed to manage your roster." 

412 ) 

413 return 

414 except (IqError, IqTimeout) as e: 

415 self.log.warning("Could not add to roster", exc_info=e) 

416 else: 

417 # we only broadcast pubsub events for contacts added to the roster 

418 # so if something was set before, we need to push it now 

419 self.added_to_roster = True 

420 self.send_last_presence(force=True) 

421 

422 async def __broadcast_pubsub_items(self) -> None: 

423 if not self.is_friend: 

424 return 

425 if not self.added_to_roster: 

426 return 

427 cached_avatar = self.get_cached_avatar() 

428 if cached_avatar is not None: 

429 await self.xmpp.pubsub.broadcast_avatar( 

430 self.jid.bare, self.session.user_jid, cached_avatar 

431 ) 

432 nick = self.name 

433 

434 if nick is not None: 

435 self.xmpp.pubsub.broadcast_nick( 

436 self.session.user_jid, 

437 self.jid.bare, 

438 nick, 

439 ) 

440 

441 def send_friend_request(self, text: str | None = None) -> None: 

442 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

443 self._send(presence, nick=True) 

444 

445 async def accept_friend_request(self, text: str | None = None) -> None: 

446 """ 

447 Call this to signify that this Contact has accepted to be a friend 

448 of the user. 

449 

450 :param text: Optional message from the friend to the user 

451 """ 

452 self.is_friend = True 

453 self.added_to_roster = True 

454 self.log.debug("Accepting friend request") 

455 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

456 self._send(presence, nick=True) 

457 self.send_last_presence() 

458 await self.__broadcast_pubsub_items() 

459 self.log.debug("Accepted friend request") 

460 

461 def reject_friend_request(self, text: str | None = None) -> None: 

462 """ 

463 Call this to signify that this Contact has refused to be a contact 

464 of the user (or that they don't want to be friends anymore) 

465 

466 :param text: Optional message from the non-friend to the user 

467 """ 

468 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

469 self.offline() 

470 self._send(presence, nick=True) 

471 self.is_friend = False 

472 

473 async def on_friend_request(self, text: str = "") -> None: 

474 """ 

475 Called when receiving a "subscribe" presence, ie, "I would like to add 

476 you to my contacts/friends", from the user to this contact. 

477 

478 In XMPP terms: "I would like to receive your presence updates" 

479 

480 This is only called if self.is_friend = False. If self.is_friend = True, 

481 slidge will automatically "accept the friend request", ie, reply with 

482 a "subscribed" presence. 

483 

484 When called, a 'friend request event' should be sent to the legacy 

485 service, and when the contact responds, you should either call 

486 self.accept_subscription() or self.reject_subscription() 

487 """ 

488 pass 

489 

490 async def on_friend_delete(self, text: str = "") -> None: 

491 """ 

492 Called when receiving an "unsubscribed" presence, ie, "I would like to 

493 remove you to my contacts/friends" or "I refuse your friend request" 

494 from the user to this contact. 

495 

496 In XMPP terms: "You won't receive my presence updates anymore (or you 

497 never have)". 

498 """ 

499 pass 

500 

501 async def on_friend_accept(self) -> None: 

502 """ 

503 Called when receiving a "subscribed" presence, ie, "I accept to be 

504 your/confirm that you are my friend" from the user to this contact. 

505 

506 In XMPP terms: "You will receive my presence updates". 

507 """ 

508 pass 

509 

510 def unsubscribe(self) -> None: 

511 """ 

512 (internal use by slidge) 

513 

514 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

515 from this contact to the user, ie, "this contact has removed you from 

516 their 'friends'". 

517 """ 

518 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

519 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

520 

521 async def update_info(self) -> None: 

522 """ 

523 Fetch information about this contact from the legacy network 

524 

525 This is awaited on Contact instantiation, and should be overridden to 

526 update the nickname, avatar, vcard [...] of this contact, by making 

527 "legacy API calls". 

528 

529 To take advantage of the slidge avatar cache, you can check the .avatar 

530 property to retrieve the "legacy file ID" of the cached avatar. If there 

531 is no change, you should not call 

532 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

533 to modify the ``.avatar`` property. 

534 """ 

535 pass 

536 

537 async def fetch_vcard(self) -> None: 

538 """ 

539 It the legacy network doesn't like that you fetch too many profiles on startup, 

540 it's also possible to fetch it here, which will be called when XMPP clients 

541 of the user request the vcard, if it hasn't been fetched before 

542 :return: 

543 """ 

544 pass 

545 

546 def _make_presence( 

547 self, 

548 *, 

549 last_seen: datetime.datetime | None = None, 

550 status_codes: set[int] | None = None, 

551 user_full_jid: JID | None = None, 

552 **presence_kwargs, 

553 ): 

554 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

555 caps = self.xmpp.plugin["xep_0115"] 

556 if p.get_from().resource and self.stored.caps_ver: 

557 p["caps"]["node"] = caps.caps_node 

558 p["caps"]["hash"] = caps.hash 

559 p["caps"]["ver"] = self.stored.caps_ver 

560 return p 

561 

562 

563def is_markable(stanza: Message | Presence): 

564 if isinstance(stanza, Presence): 

565 return False 

566 return bool(stanza["body"]) 

567 

568 

569log = logging.getLogger(__name__)