Coverage for slidge/contact/contact.py: 85%

282 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import datetime 

2import logging 

3import warnings 

4from datetime import date 

5from typing import TYPE_CHECKING, Generic, Iterable, Iterator, Optional, Union 

6from xml.etree import ElementTree as ET 

7 

8from slixmpp import JID, Message, Presence 

9from slixmpp.exceptions import IqError, IqTimeout 

10from slixmpp.plugins.xep_0292.stanza import VCard4 

11from slixmpp.types import MessageTypes 

12from sqlalchemy.exc import IntegrityError 

13 

14from ..core.mixins import AvatarMixin, FullCarbonMixin 

15from ..core.mixins.disco import ContactAccountDiscoMixin 

16from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin 

17from ..db.models import Contact, ContactSent, Participant 

18from ..util import SubclassableOnce 

19from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar 

20 

21if TYPE_CHECKING: 

22 from ..core.session import BaseSession 

23 from ..group.participant import LegacyParticipant 

24 

25 

26class LegacyContact( 

27 Generic[LegacyUserIdType], 

28 AvatarMixin, 

29 ContactAccountDiscoMixin, 

30 FullCarbonMixin, 

31 ReactionRecipientMixin, 

32 ThreadRecipientMixin, 

33 metaclass=SubclassableOnce, 

34): 

35 """ 

36 This class centralizes actions in relation to a specific legacy contact. 

37 

38 You shouldn't create instances of contacts manually, but rather rely on 

39 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are 

40 singletons. The :class:`.LegacyRoster` instance of a session is accessible 

41 through the :attr:`.BaseSession.contacts` attribute. 

42 

43 Typically, your plugin should have methods hook to the legacy events and 

44 call appropriate methods here to transmit the "legacy action" to the xmpp 

45 user. This should look like this: 

46 

47 .. code-block:python 

48 

49 class Session(BaseSession): 

50 ... 

51 

52 async def on_cool_chat_network_new_text_message(self, legacy_msg_event): 

53 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

54 contact.send_text(legacy_msg_event.text) 

55 

56 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): 

57 contact = self.contacts.by_legacy_id(legacy_msg_event.from) 

58 contact.composing() 

59 ... 

60 

61 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM 

62 the user TO the contact, typically when the user uses an official client to 

63 do an action such as sending a message or marking as message as read. 

64 This will use :xep:`0363` to impersonate the XMPP user in order. 

65 """ 

66 

67 session: "BaseSession" 

68 

69 RESOURCE: str = "slidge" 

70 """ 

71 A full JID, including a resource part is required for chat states (and maybe other stuff) 

72 to work properly. This is the name of the resource the contacts will use. 

73 """ 

74 PROPAGATE_PRESENCE_TO_GROUPS = True 

75 

76 mtype: MessageTypes = "chat" 

77 _can_send_carbon = True 

78 is_participant = False 

79 is_group = False 

80 

81 _ONLY_SEND_PRESENCE_CHANGES = True 

82 

83 STRIP_SHORT_DELAY = True 

84 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"} 

85 

86 INVITATION_RECIPIENT = True 

87 

88 stored: Contact 

89 model: Contact 

90 

91 def __init__(self, session: "BaseSession", stored: Contact) -> None: 

92 self.session = session 

93 self.xmpp = session.xmpp 

94 self.stored = stored 

95 self._set_logger() 

96 super().__init__() 

97 

98 @property 

99 def jid(self): # type:ignore[override] 

100 jid = JID(self.stored.jid) 

101 jid.resource = self.RESOURCE 

102 return jid 

103 

104 @property 

105 def legacy_id(self): 

106 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) 

107 

108 async def get_vcard(self, fetch: bool = True) -> VCard4 | None: 

109 if fetch and not self.stored.vcard_fetched: 

110 await self.fetch_vcard() 

111 if self.stored.vcard is None: 

112 return None 

113 

114 return VCard4(xml=ET.fromstring(self.stored.vcard)) 

115 

116 @property 

117 def is_friend(self) -> bool: 

118 return self.stored.is_friend 

119 

120 @is_friend.setter 

121 def is_friend(self, value: bool) -> None: 

122 if value == self.is_friend: 

123 return 

124 self.update_stored_attribute(is_friend=value) 

125 

126 @property 

127 def added_to_roster(self) -> bool: 

128 return self.stored.added_to_roster 

129 

130 @added_to_roster.setter 

131 def added_to_roster(self, value: bool) -> None: 

132 if value == self.added_to_roster: 

133 return 

134 self.update_stored_attribute(added_to_roster=value) 

135 

136 @property 

137 def participants(self) -> Iterator["LegacyParticipant"]: 

138 with self.xmpp.store.session() as orm: 

139 self.stored = orm.merge(self.stored) 

140 participants = self.stored.participants 

141 for p in participants: 

142 with self.xmpp.store.session() as orm: 

143 p = orm.merge(p) 

144 muc = self.session.bookmarks.from_store(p.room) 

145 yield muc.participant_from_store(p, contact=self) 

146 

147 @property 

148 def user_jid(self): 

149 return self.session.user_jid 

150 

151 @property # type:ignore 

152 def DISCO_TYPE(self) -> ClientType: 

153 return self.client_type 

154 

155 @DISCO_TYPE.setter 

156 def DISCO_TYPE(self, value: ClientType) -> None: 

157 self.client_type = value 

158 

159 @property 

160 def client_type(self) -> ClientType: 

161 """ 

162 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client 

163 

164 Default is "pc". 

165 """ 

166 return self.stored.client_type 

167 

168 @client_type.setter 

169 def client_type(self, value: ClientType) -> None: 

170 if self.stored.client_type == value: 

171 return 

172 self.update_stored_attribute(client_type=value) 

173 

174 def _set_logger(self) -> None: 

175 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}") 

176 

177 def __repr__(self) -> str: 

178 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>" 

179 

180 def __get_subscription_string(self) -> str: 

181 if self.is_friend: 

182 return "both" 

183 return "none" 

184 

185 def __propagate_to_participants(self, stanza: Presence) -> None: 

186 if not self.PROPAGATE_PRESENCE_TO_GROUPS: 

187 return 

188 

189 ptype = stanza["type"] 

190 if ptype in ("available", "chat"): 

191 func_name = "online" 

192 elif ptype in ("xa", "unavailable"): 

193 # we map unavailable to extended_away, because offline is 

194 # "participant leaves the MUC" 

195 # TODO: improve this with a clear distinction between participant 

196 # and member list 

197 func_name = "extended_away" 

198 elif ptype == "busy": 

199 func_name = "busy" 

200 elif ptype == "away": 

201 func_name = "away" 

202 else: 

203 return 

204 

205 last_seen: Optional[datetime.datetime] = ( 

206 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None 

207 ) 

208 

209 kw = dict(status=stanza["status"], last_seen=last_seen) 

210 

211 for part in self.participants: 

212 func = getattr(part, func_name) 

213 func(**kw) 

214 

215 def _send( 

216 self, 

217 stanza: MessageOrPresenceTypeVar, 

218 carbon: bool = False, 

219 nick: bool = False, 

220 **send_kwargs, 

221 ) -> MessageOrPresenceTypeVar: 

222 if carbon and isinstance(stanza, Message): 

223 stanza["to"] = self.jid.bare 

224 stanza["from"] = self.user_jid 

225 self._privileged_send(stanza) 

226 return stanza # type:ignore 

227 

228 if isinstance(stanza, Presence): 

229 if not self._updating_info: 

230 self.__propagate_to_participants(stanza) 

231 if ( 

232 not self.is_friend 

233 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER 

234 ): 

235 return stanza # type:ignore 

236 if self.name and (nick or not self.is_friend): 

237 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

238 n["nick"] = self.name 

239 stanza.append(n) 

240 if ( 

241 not self._updating_info 

242 and self.xmpp.MARK_ALL_MESSAGES 

243 and is_markable(stanza) 

244 ): 

245 try: 

246 with self.xmpp.store.session(expire_on_commit=False) as orm: 

247 self.stored = orm.merge(self.stored) 

248 new = ContactSent(contact=self.stored, msg_id=stanza["id"]) 

249 orm.add(new) 

250 self.stored.sent_order.append(new) 

251 orm.commit() 

252 except IntegrityError: 

253 self.log.warning("Contact has already sent message %s", stanza["id"]) 

254 stanza["to"] = self.user_jid 

255 stanza.send() 

256 return stanza 

257 

258 def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str) -> list[str]: 

259 """ 

260 Return XMPP msg ids sent by this contact up to a given XMPP msg id. 

261 

262 Plugins have no reason to use this, but it is used by slidge core 

263 for legacy networks that need to mark all messages as read (most XMPP 

264 clients only send a read marker for the latest message). 

265 

266 This has side effects, if the horizon XMPP id is found, messages up to 

267 this horizon are not cleared, to avoid sending the same read mark twice. 

268 

269 :param horizon_xmpp_id: The latest message 

270 :return: A list of XMPP ids or None if horizon_xmpp_id was not found 

271 """ 

272 with self.xmpp.store.session() as orm: 

273 assert self.stored.id is not None 

274 ids = self.xmpp.store.contacts.pop_sent_up_to( 

275 orm, self.stored.id, horizon_xmpp_id 

276 ) 

277 orm.commit() 

278 return ids 

279 

280 @property 

281 def name(self) -> str: 

282 """ 

283 Friendly name of the contact, as it should appear in the user's roster 

284 """ 

285 return self.stored.nick or "" 

286 

287 @name.setter 

288 def name(self, n: Optional[str]) -> None: 

289 if self.stored.nick == n: 

290 return 

291 self.update_stored_attribute(nick=n) 

292 self._set_logger() 

293 if self.is_friend and self.added_to_roster: 

294 self.xmpp.pubsub.broadcast_nick( 

295 user_jid=self.user_jid, jid=self.jid.bare, nick=n 

296 ) 

297 for p in self.participants: 

298 p.nickname = n or str(self.legacy_id) 

299 

300 def _post_avatar_update(self, cached_avatar) -> None: 

301 if self.is_friend and self.added_to_roster: 

302 self.session.create_task( 

303 self.session.xmpp.pubsub.broadcast_avatar( 

304 self.jid.bare, self.session.user_jid, cached_avatar 

305 ) 

306 ) 

307 for p in self.participants: 

308 self.log.debug("Propagating new avatar to %s", p.muc) 

309 p.send_last_presence(force=True, no_cache_online=True) 

310 

311 def set_vcard( 

312 self, 

313 /, 

314 full_name: Optional[str] = None, 

315 given: Optional[str] = None, 

316 surname: Optional[str] = None, 

317 birthday: Optional[date] = None, 

318 phone: Optional[str] = None, 

319 phones: Iterable[str] = (), 

320 note: Optional[str] = None, 

321 url: Optional[str] = None, 

322 email: Optional[str] = None, 

323 country: Optional[str] = None, 

324 locality: Optional[str] = None, 

325 pronouns: Optional[str] = None, 

326 ) -> None: 

327 vcard = VCard4() 

328 vcard.add_impp(f"xmpp:{self.jid.bare}") 

329 

330 if n := self.name: 

331 vcard.add_nickname(n) 

332 if full_name: 

333 vcard["full_name"] = full_name 

334 elif n: 

335 vcard["full_name"] = n 

336 

337 if given: 

338 vcard["given"] = given 

339 if surname: 

340 vcard["surname"] = surname 

341 if birthday: 

342 vcard["birthday"] = birthday 

343 

344 if note: 

345 vcard.add_note(note) 

346 if url: 

347 vcard.add_url(url) 

348 if email: 

349 vcard.add_email(email) 

350 if phone: 

351 vcard.add_tel(phone) 

352 for p in phones: 

353 vcard.add_tel(p) 

354 if country and locality: 

355 vcard.add_address(country, locality) 

356 elif country: 

357 vcard.add_address(country, locality) 

358 if pronouns: 

359 vcard["pronouns"]["text"] = pronouns 

360 

361 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True) 

362 self.session.create_task( 

363 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard) 

364 ) 

365 

366 def get_roster_item(self): 

367 item = { 

368 "subscription": self.__get_subscription_string(), 

369 "groups": [self.xmpp.ROSTER_GROUP], 

370 } 

371 if (n := self.name) is not None: 

372 item["name"] = n 

373 return {self.jid.bare: item} 

374 

375 async def add_to_roster(self, force: bool = False) -> None: 

376 """ 

377 Add this contact to the user roster using :xep:`0356` 

378 

379 :param force: add even if the contact was already added successfully 

380 """ 

381 if self.added_to_roster and not force: 

382 return 

383 if not self.session.user.preferences.get("roster_push", True): 

384 log.debug("Roster push request by plugin ignored (--no-roster-push)") 

385 return 

386 try: 

387 await self.xmpp["xep_0356"].set_roster( 

388 jid=self.user_jid, roster_items=self.get_roster_item() 

389 ) 

390 except PermissionError: 

391 warnings.warn( 

392 "Slidge does not have the privilege (XEP-0356) to manage rosters. " 

393 "Consider configuring your XMPP server for that." 

394 ) 

395 self.send_friend_request( 

396 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " 

397 "slidge is not allowed to manage your roster." 

398 ) 

399 return 

400 except (IqError, IqTimeout) as e: 

401 self.log.warning("Could not add to roster", exc_info=e) 

402 else: 

403 # we only broadcast pubsub events for contacts added to the roster 

404 # so if something was set before, we need to push it now 

405 self.added_to_roster = True 

406 self.send_last_presence(force=True) 

407 

408 async def __broadcast_pubsub_items(self) -> None: 

409 if not self.is_friend: 

410 return 

411 if not self.added_to_roster: 

412 return 

413 cached_avatar = self.get_cached_avatar() 

414 if cached_avatar is not None: 

415 await self.xmpp.pubsub.broadcast_avatar( 

416 self.jid.bare, self.session.user_jid, cached_avatar 

417 ) 

418 nick = self.name 

419 

420 if nick is not None: 

421 self.xmpp.pubsub.broadcast_nick( 

422 self.session.user_jid, 

423 self.jid.bare, 

424 nick, 

425 ) 

426 

427 def send_friend_request(self, text: Optional[str] = None) -> None: 

428 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True) 

429 self._send(presence, nick=True) 

430 

431 async def accept_friend_request(self, text: Optional[str] = None) -> None: 

432 """ 

433 Call this to signify that this Contact has accepted to be a friend 

434 of the user. 

435 

436 :param text: Optional message from the friend to the user 

437 """ 

438 self.is_friend = True 

439 self.added_to_roster = True 

440 self.log.debug("Accepting friend request") 

441 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True) 

442 self._send(presence, nick=True) 

443 self.send_last_presence() 

444 await self.__broadcast_pubsub_items() 

445 self.log.debug("Accepted friend request") 

446 

447 def reject_friend_request(self, text: Optional[str] = None) -> None: 

448 """ 

449 Call this to signify that this Contact has refused to be a contact 

450 of the user (or that they don't want to be friends anymore) 

451 

452 :param text: Optional message from the non-friend to the user 

453 """ 

454 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True) 

455 self.offline() 

456 self._send(presence, nick=True) 

457 self.is_friend = False 

458 

459 async def on_friend_request(self, text: str = "") -> None: 

460 """ 

461 Called when receiving a "subscribe" presence, ie, "I would like to add 

462 you to my contacts/friends", from the user to this contact. 

463 

464 In XMPP terms: "I would like to receive your presence updates" 

465 

466 This is only called if self.is_friend = False. If self.is_friend = True, 

467 slidge will automatically "accept the friend request", ie, reply with 

468 a "subscribed" presence. 

469 

470 When called, a 'friend request event' should be sent to the legacy 

471 service, and when the contact responds, you should either call 

472 self.accept_subscription() or self.reject_subscription() 

473 """ 

474 pass 

475 

476 async def on_friend_delete(self, text: str = "") -> None: 

477 """ 

478 Called when receiving an "unsubscribed" presence, ie, "I would like to 

479 remove you to my contacts/friends" or "I refuse your friend request" 

480 from the user to this contact. 

481 

482 In XMPP terms: "You won't receive my presence updates anymore (or you 

483 never have)". 

484 """ 

485 pass 

486 

487 async def on_friend_accept(self) -> None: 

488 """ 

489 Called when receiving a "subscribed" presence, ie, "I accept to be 

490 your/confirm that you are my friend" from the user to this contact. 

491 

492 In XMPP terms: "You will receive my presence updates". 

493 """ 

494 pass 

495 

496 def unsubscribe(self) -> None: 

497 """ 

498 (internal use by slidge) 

499 

500 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence 

501 from this contact to the user, ie, "this contact has removed you from 

502 their 'friends'". 

503 """ 

504 for ptype in "unsubscribe", "unsubscribed", "unavailable": 

505 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) 

506 

507 async def update_info(self) -> None: 

508 """ 

509 Fetch information about this contact from the legacy network 

510 

511 This is awaited on Contact instantiation, and should be overridden to 

512 update the nickname, avatar, vcard [...] of this contact, by making 

513 "legacy API calls". 

514 

515 To take advantage of the slidge avatar cache, you can check the .avatar 

516 property to retrieve the "legacy file ID" of the cached avatar. If there 

517 is no change, you should not call 

518 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt 

519 to modify the ``.avatar`` property. 

520 """ 

521 pass 

522 

523 async def fetch_vcard(self) -> None: 

524 """ 

525 It the legacy network doesn't like that you fetch too many profiles on startup, 

526 it's also possible to fetch it here, which will be called when XMPP clients 

527 of the user request the vcard, if it hasn't been fetched before 

528 :return: 

529 """ 

530 pass 

531 

532 def _make_presence( 

533 self, 

534 *, 

535 last_seen: Optional[datetime.datetime] = None, 

536 status_codes: Optional[set[int]] = None, 

537 user_full_jid: Optional[JID] = None, 

538 **presence_kwargs, 

539 ): 

540 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

541 caps = self.xmpp.plugin["xep_0115"] 

542 if p.get_from().resource and self.stored.caps_ver: 

543 p["caps"]["node"] = caps.caps_node 

544 p["caps"]["hash"] = caps.hash 

545 p["caps"]["ver"] = self.stored.caps_ver 

546 return p 

547 

548 

549def is_markable(stanza: Union[Message, Presence]): 

550 if isinstance(stanza, Presence): 

551 return False 

552 return bool(stanza["body"]) 

553 

554 

555log = logging.getLogger(__name__)