Coverage for slidge / contact / contact.py: 86%

281 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import datetime 

2import logging 

3import warnings 

4from datetime import date 

5from typing import TYPE_CHECKING, Generic, Iterable, Iterator, Optional, Union 

6from xml.etree import ElementTree as ET 

7 

8from slixmpp import JID, Message, Presence 

9from slixmpp.exceptions import IqError, IqTimeout 

10from slixmpp.plugins.xep_0292.stanza import VCard4 

11from slixmpp.types import MessageTypes 

12 

13from ..core.mixins import AvatarMixin, FullCarbonMixin 

14from ..core.mixins.disco import ContactAccountDiscoMixin 

15from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

16from ..db.models import Contact, ContactSent 

17from ..util import SubclassableOnce 

18from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar 

19 

20if TYPE_CHECKING: 

21 from ..core.session import BaseSession 

22 from ..group.participant import LegacyParticipant 

23 

24 

25class LegacyContact( 

26 Generic[LegacyUserIdType], 

27 AvatarMixin, 

28 ContactAccountDiscoMixin, 

29 FullCarbonMixin, 

30 ReactionRecipientMixin, 

31 ThreadRecipientMixin, 

32 metaclass=SubclassableOnce, 

33): 

34 """ 

35 This class centralizes actions in relation to a specific legacy contact. 

36 

37 You shouldn't create instances of contacts manually, but rather rely on 

38 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

39 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

40 through the :attr:`.BaseSession.contacts` attribute. 

41 

42 Typically, your plugin should have methods hook to the legacy events and 

43 call appropriate methods here to transmit the "legacy action" to the xmpp 

44 user. This should look like this: 

45 

46 .. code-block:python 

47 

48 class Session(BaseSession): 

49 ... 

50 

51 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

52 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

53 contact.send_text(legacy_msg_event.text) 

54 

55 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

56 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

57 contact.composing() 

58 ... 

59 

60 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

61 the user TO the contact, typically when the user uses an official client to 

62 do an action such as sending a message or marking as message as read. 

63 This will use :xep:`0363` to impersonate the XMPP user in order. 

64 """ 

65 

66 session: "BaseSession" 

67 

68 RESOURCE: str = "slidge" 

69 """ 

70 A full JID, including a resource part is required for chat states (and maybe other stuff) 

71 to work properly. This is the name of the resource the contacts will use. 

72 """ 

73 PROPAGATE_PRESENCE_TO_GROUPS = True 

74 

75 mtype: MessageTypes = "chat" 

76 _can_send_carbon = True 

77 is_participant = False 

78 is_group = False 

79 

80 _ONLY_SEND_PRESENCE_CHANGES = True 

81 

82 STRIP_SHORT_DELAY = True 

83 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"} 

84 

85 INVITATION_RECIPIENT = True 

86 

87 stored: Contact 

88 model: Contact 

89 

90 def __init__(self, session: "BaseSession", stored: Contact) -> None: 

91 self.session = session 

92 self.xmpp = session.xmpp 

93 self.stored = stored 

94 self._set_logger() 

95 super().__init__() 

96 

97 @property 

98 def jid(self): # type:ignore[override] 

99 jid = JID(self.stored.jid) 

100 jid.resource = self.RESOURCE 

101 return jid 

102 

103 @property 

104 def legacy_id(self): 

105 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) 

106 

107 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

108 if fetch and not self.stored.vcard_fetched: 

109 await self.fetch_vcard() 

110 if self.stored.vcard is None: 

111 return None 

112 

113 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

114 

115 @property 

116 def is_friend(self) -> bool: 

117 return self.stored.is_friend 

118 

119 @is_friend.setter 

120 def is_friend(self, value: bool) -> None: 

121 if value == self.is_friend: 

122 return 

123 self.update_stored_attribute(is_friend=value) 

124 

125 @property 

126 def added_to_roster(self) -> bool: 

127 return self.stored.added_to_roster 

128 

129 @added_to_roster.setter 

130 def added_to_roster(self, value: bool) -> None: 

131 if value == self.added_to_roster: 

132 return 

133 self.update_stored_attribute(added_to_roster=value) 

134 

135 @property 

136 def participants(self) -> Iterator["LegacyParticipant"]: 

137 with self.xmpp.store.session() as orm: 

138 self.stored = orm.merge(self.stored) 

139 participants = self.stored.participants 

140 for p in participants: 

141 with self.xmpp.store.session() as orm: 

142 p = orm.merge(p) 

143 muc = self.session.bookmarks.from_store(p.room) 

144 yield muc.participant_from_store(p, contact=self) 

145 

146 @property 

147 def user_jid(self): 

148 return self.session.user_jid 

149 

150 @property # type:ignore 

151 def DISCO_TYPE(self) -> ClientType: 

152 return self.client_type 

153 

154 @DISCO_TYPE.setter 

155 def DISCO_TYPE(self, value: ClientType) -> None: 

156 self.client_type = value 

157 

158 @property 

159 def client_type(self) -> ClientType: 

160 """ 

161 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

162 

163 Default is "pc". 

164 """ 

165 return self.stored.client_type 

166 

167 @client_type.setter 

168 def client_type(self, value: ClientType) -> None: 

169 if self.stored.client_type == value: 

170 return 

171 self.update_stored_attribute(client_type=value) 

172 

173 def _set_logger(self) -> None: 

174 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

175 

176 def __repr__(self) -> str: 

177 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

178 

179 def __get_subscription_string(self) -> str: 

180 if self.is_friend: 

181 return "both" 

182 return "none" 

183 

184 def __propagate_to_participants(self, stanza: Presence) -> None: 

185 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

186 return 

187 

188 ptype = stanza["type"] 

189 if ptype in ("available", "chat"): 

190 func_name = "online" 

191 elif ptype in ("xa", "unavailable"): 

192 # we map unavailable to extended_away, because offline is 

193 # "participant leaves the MUC" 

194 # TODO: improve this with a clear distinction between participant 

195 # and member list 

196 func_name = "extended_away" 

197 elif ptype == "busy": 

198 func_name = "busy" 

199 elif ptype == "away": 

200 func_name = "away" 

201 else: 

202 return 

203 

204 last_seen: Optional[datetime.datetime] = ( 

205 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None 

206 ) 

207 

208 kw = dict(status=stanza["status"], last_seen=last_seen) 

209 

210 for part in self.participants: 

211 func = getattr(part, func_name) 

212 func(**kw) 

213 

214 def _send( 

215 self, 

216 stanza: MessageOrPresenceTypeVar, 

217 carbon: bool = False, 

218 nick: bool = False, 

219 **send_kwargs, 

220 ) -> MessageOrPresenceTypeVar: 

221 if carbon and isinstance(stanza, Message): 

222 stanza["to"] = self.jid.bare 

223 stanza["from"] = self.user_jid 

224 self._privileged_send(stanza) 

225 return stanza # type:ignore 

226 

227 if isinstance(stanza, Presence): 

228 if not self._updating_info: 

229 self.__propagate_to_participants(stanza) 

230 if ( 

231 not self.is_friend 

232 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

233 ): 

234 return stanza # type:ignore 

235 if self.name and (nick or not self.is_friend): 

236 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

237 n["nick"] = self.name 

238 stanza.append(n) 

239 if ( 

240 not self._updating_info 

241 and self.xmpp.MARK_ALL_MESSAGES 

242 and is_markable(stanza) 

243 ): 

244 with self.xmpp.store.session(expire_on_commit=False) as orm: 

245 self.stored = orm.merge(self.stored) 

246 exists = ( 

247 orm.query(ContactSent) 

248 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"]) 

249 .first() 

250 ) 

251 if exists: 

252 self.log.warning( 

253 "Contact has already sent message %s", stanza["id"] 

254 ) 

255 else: 

256 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

257 orm.add(new) 

258 self.stored.sent_order.append(new) 

259 orm.commit() 

260 stanza["to"] = self.user_jid 

261 stanza.send() 

262 return stanza 

263 

264 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]: 

265 """ 

266 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

267 

268 Legacy modules have no reason to use this, but it is used by slidge core 

269 for legacy networks that need to mark all messages as read (most XMPP 

270 clients only send a read marker for the latest message). 

271 

272 This has side effects, if the horizon XMPP id is found, messages up to 

273 this horizon are cleared, to avoid sending the same read mark twice. 

274 

275 :param horizon_xmpp_id: The latest message 

276 :return: A list of XMPP ids up to horizon_xmpp_id, included 

277 """ 

278 with self.xmpp.store.session() as orm: 

279 assert self.stored.id is not None 

280 ids = self.xmpp.store.contacts.pop_sent_up_to( 

281 orm, self.stored.id, horizon_xmpp_id 

282 ) 

283 orm.commit() 

284 return ids 

285 

286 @property 

287 def name(self) -> str: 

288 """ 

289 Friendly name of the contact, as it should appear in the user's roster 

290 """ 

291 return self.stored.nick or "" 

292 

293 @name.setter 

294 def name(self, n: Optional[str]) -> None: 

295 if self.stored.nick == n: 

296 return 

297 self.update_stored_attribute(nick=n) 

298 self._set_logger() 

299 if self.is_friend and self.added_to_roster: 

300 self.xmpp.pubsub.broadcast_nick( 

301 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

302 ) 

303 for p in self.participants: 

304 p.nickname = n or str(self.legacy_id) 

305 

306 def _post_avatar_update(self, cached_avatar) -> None: 

307 if self.is_friend and self.added_to_roster: 

308 self.session.create_task( 

309 self.session.xmpp.pubsub.broadcast_avatar( 

310 self.jid.bare, self.session.user_jid, cached_avatar 

311 ) 

312 ) 

313 for p in self.participants: 

314 self.log.debug("Propagating new avatar to %s", p.muc) 

315 p.send_last_presence(force=True, no_cache_online=True) 

316 

317 def set_vcard( 

318 self, 

319 /, 

320 full_name: Optional[str] = None, 

321 given: Optional[str] = None, 

322 surname: Optional[str] = None, 

323 birthday: Optional[date] = None, 

324 phone: Optional[str] = None, 

325 phones: Iterable[str] = (), 

326 note: Optional[str] = None, 

327 url: Optional[str] = None, 

328 email: Optional[str] = None, 

329 country: Optional[str] = None, 

330 locality: Optional[str] = None, 

331 pronouns: Optional[str] = None, 

332 ) -> None: 

333 """ 

334 Update xep:`0292` data for this contact. 

335 

336 Use this for additional metadata about this contact to be available to XMPP 

337 clients. The "note" argument is a text of arbitrary size and can be useful when 

338 no other field is a good fit. 

339 """ 

340 vcard = VCard4() 

341 vcard.add_impp(f"xmpp:{self.jid.bare}") 

342 

343 if n := self.name: 

344 vcard.add_nickname(n) 

345 if full_name: 

346 vcard["full_name"] = full_name 

347 elif n: 

348 vcard["full_name"] = n 

349 

350 if given: 

351 vcard["given"] = given 

352 if surname: 

353 vcard["surname"] = surname 

354 if birthday: 

355 vcard["birthday"] = birthday 

356 

357 if note: 

358 vcard.add_note(note) 

359 if url: 

360 vcard.add_url(url) 

361 if email: 

362 vcard.add_email(email) 

363 if phone: 

364 vcard.add_tel(phone) 

365 for p in phones: 

366 vcard.add_tel(p) 

367 if country and locality: 

368 vcard.add_address(country, locality) 

369 elif country: 

370 vcard.add_address(country, locality) 

371 if pronouns: 

372 vcard["pronouns"]["text"] = pronouns 

373 

374 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True) 

375 self.session.create_task( 

376 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

377 ) 

378 

379 def get_roster_item(self): 

380 item = { 

381 "subscription": self.__get_subscription_string(), 

382 "groups": [self.xmpp.ROSTER_GROUP], 

383 } 

384 if (n := self.name) is not None: 

385 item["name"] = n 

386 return {self.jid.bare: item} 

387 

388 async def add_to_roster(self, force: bool = False) -> None: 

389 """ 

390 Add this contact to the user roster using :xep:`0356` 

391 

392 :param force: add even if the contact was already added successfully 

393 """ 

394 if self.added_to_roster and not force: 

395 return 

396 if not self.session.user.preferences.get("roster_push", True): 

397 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

398 return 

399 try: 

400 await self.xmpp["xep_0356"].set_roster( 

401 jid=self.user_jid, roster_items=self.get_roster_item() 

402 ) 

403 except PermissionError: 

404 warnings.warn( 

405 "Slidge does not have the privilege (XEP-0356) to manage rosters. " 

406 "Consider configuring your XMPP server for that." 

407 ) 

408 self.send_friend_request( 

409 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

410 "slidge is not allowed to manage your roster." 

411 ) 

412 return 

413 except (IqError, IqTimeout) as e: 

414 self.log.warning("Could not add to roster", exc_info=e) 

415 else: 

416 # we only broadcast pubsub events for contacts added to the roster 

417 # so if something was set before, we need to push it now 

418 self.added_to_roster = True 

419 self.send_last_presence(force=True) 

420 

421 async def __broadcast_pubsub_items(self) -> None: 

422 if not self.is_friend: 

423 return 

424 if not self.added_to_roster: 

425 return 

426 cached_avatar = self.get_cached_avatar() 

427 if cached_avatar is not None: 

428 await self.xmpp.pubsub.broadcast_avatar( 

429 self.jid.bare, self.session.user_jid, cached_avatar 

430 ) 

431 nick = self.name 

432 

433 if nick is not None: 

434 self.xmpp.pubsub.broadcast_nick( 

435 self.session.user_jid, 

436 self.jid.bare, 

437 nick, 

438 ) 

439 

440 def send_friend_request(self, text: Optional[str] = None) -> None: 

441 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

442 self._send(presence, nick=True) 

443 

444 async def accept_friend_request(self, text: Optional[str] = None) -> None: 

445 """ 

446 Call this to signify that this Contact has accepted to be a friend 

447 of the user. 

448 

449 :param text: Optional message from the friend to the user 

450 """ 

451 self.is_friend = True 

452 self.added_to_roster = True 

453 self.log.debug("Accepting friend request") 

454 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

455 self._send(presence, nick=True) 

456 self.send_last_presence() 

457 await self.__broadcast_pubsub_items() 

458 self.log.debug("Accepted friend request") 

459 

460 def reject_friend_request(self, text: Optional[str] = None) -> None: 

461 """ 

462 Call this to signify that this Contact has refused to be a contact 

463 of the user (or that they don't want to be friends anymore) 

464 

465 :param text: Optional message from the non-friend to the user 

466 """ 

467 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

468 self.offline() 

469 self._send(presence, nick=True) 

470 self.is_friend = False 

471 

472 async def on_friend_request(self, text: str = "") -> None: 

473 """ 

474 Called when receiving a "subscribe" presence, ie, "I would like to add 

475 you to my contacts/friends", from the user to this contact. 

476 

477 In XMPP terms: "I would like to receive your presence updates" 

478 

479 This is only called if self.is_friend = False. If self.is_friend = True, 

480 slidge will automatically "accept the friend request", ie, reply with 

481 a "subscribed" presence. 

482 

483 When called, a 'friend request event' should be sent to the legacy 

484 service, and when the contact responds, you should either call 

485 self.accept_subscription() or self.reject_subscription() 

486 """ 

487 pass 

488 

489 async def on_friend_delete(self, text: str = "") -> None: 

490 """ 

491 Called when receiving an "unsubscribed" presence, ie, "I would like to 

492 remove you to my contacts/friends" or "I refuse your friend request" 

493 from the user to this contact. 

494 

495 In XMPP terms: "You won't receive my presence updates anymore (or you 

496 never have)". 

497 """ 

498 pass 

499 

500 async def on_friend_accept(self) -> None: 

501 """ 

502 Called when receiving a "subscribed" presence, ie, "I accept to be 

503 your/confirm that you are my friend" from the user to this contact. 

504 

505 In XMPP terms: "You will receive my presence updates". 

506 """ 

507 pass 

508 

509 def unsubscribe(self) -> None: 

510 """ 

511 (internal use by slidge) 

512 

513 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

514 from this contact to the user, ie, "this contact has removed you from 

515 their 'friends'". 

516 """ 

517 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

518 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

519 

520 async def update_info(self) -> None: 

521 """ 

522 Fetch information about this contact from the legacy network 

523 

524 This is awaited on Contact instantiation, and should be overridden to 

525 update the nickname, avatar, vcard [...] of this contact, by making 

526 "legacy API calls". 

527 

528 To take advantage of the slidge avatar cache, you can check the .avatar 

529 property to retrieve the "legacy file ID" of the cached avatar. If there 

530 is no change, you should not call 

531 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

532 to modify the ``.avatar`` property. 

533 """ 

534 pass 

535 

536 async def fetch_vcard(self) -> None: 

537 """ 

538 It the legacy network doesn't like that you fetch too many profiles on startup, 

539 it's also possible to fetch it here, which will be called when XMPP clients 

540 of the user request the vcard, if it hasn't been fetched before 

541 :return: 

542 """ 

543 pass 

544 

545 def _make_presence( 

546 self, 

547 *, 

548 last_seen: Optional[datetime.datetime] = None, 

549 status_codes: Optional[set[int]] = None, 

550 user_full_jid: Optional[JID] = None, 

551 **presence_kwargs, 

552 ): 

553 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

554 caps = self.xmpp.plugin["xep_0115"] 

555 if p.get_from().resource and self.stored.caps_ver: 

556 p["caps"]["node"] = caps.caps_node 

557 p["caps"]["hash"] = caps.hash 

558 p["caps"]["ver"] = self.stored.caps_ver 

559 return p 

560 

561 

562def is_markable(stanza: Union[Message, Presence]): 

563 if isinstance(stanza, Presence): 

564 return False 

565 return bool(stanza["body"]) 

566 

567 

568log = logging.getLogger(__name__)