Coverage for slidge / contact / contact.py: 86%
286 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import datetime
2import logging
3import warnings
4from collections.abc import Iterable, Iterator, Sequence
5from datetime import date
6from typing import TYPE_CHECKING, Any, ClassVar, Literal, Self
7from xml.etree import ElementTree as ET
9from slixmpp import JID, Message, Presence
10from slixmpp.exceptions import IqError, IqTimeout
11from slixmpp.plugins.xep_0292.stanza import VCard4
12from slixmpp.types import MessageTypes
14from slidge.db.avatar import CachedAvatar
16from ..core.mixins import AvatarMixin, FullCarbonMixin
17from ..core.mixins.disco import ContactAccountDiscoMixin
18from ..core.mixins.recipient import RecipientMixin
19from ..db.models import Contact, ContactSent
20from ..util import SubclassableOnce
21from ..util.types import (
22 AnySession,
23 ClientType,
24 MessageOrPresenceTypeVar,
25)
27if TYPE_CHECKING:
28 from ..command.base import ContactCommand
29 from ..group.participant import LegacyParticipant
32class LegacyContact(
33 AvatarMixin,
34 ContactAccountDiscoMixin,
35 FullCarbonMixin,
36 RecipientMixin,
37 SubclassableOnce,
38):
39 """
40 This class centralizes actions in relation to a specific legacy contact.
42 You shouldn't create instances of contacts manually, but rather rely on
43 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
44 singletons. The :class:`.LegacyRoster` instance of a session is accessible
45 through the :attr:`.BaseSession.contacts` attribute.
47 Typically, your plugin should have methods hook to the legacy events and
48 call appropriate methods here to transmit the "legacy action" to the xmpp
49 user. This should look like this:
51 .. code-block:python
53 class Session(BaseSession):
54 ...
56 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
57 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
58 contact.send_text(legacy_msg_event.text)
60 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
61 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
62 contact.composing()
63 ...
65 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
66 the user TO the contact, typically when the user uses an official client to
67 do an action such as sending a message or marking as message as read.
68 This will use :xep:`0363` to impersonate the XMPP user in order.
69 """
71 session: AnySession
73 RESOURCE: str = "slidge"
74 """
75 A full JID, including a resource part is required for chat states (and maybe other stuff)
76 to work properly. This is the name of the resource the contacts will use.
77 """
78 PROPAGATE_PRESENCE_TO_GROUPS = True
80 mtype: MessageTypes = "chat"
81 _can_send_carbon = True
82 is_participant: Literal[False] = False
83 is_group: Literal[False] = False
85 _ONLY_SEND_PRESENCE_CHANGES = True
87 STRIP_SHORT_DELAY = True
88 _NON_FRIEND_PRESENCES_FILTER: ClassVar[set[str]] = {"subscribe", "unsubscribed"}
90 INVITATION_RECIPIENT = True
92 commands: ClassVar[dict[str, "type[ContactCommand[LegacyContact]]"]] = {}
93 commands_chat: ClassVar[dict[str, "type[ContactCommand[LegacyContact]]"]] = {}
95 stored: Contact
96 model: Contact
98 def __init__(self, session: AnySession, stored: Contact) -> None:
99 self.session = session
100 self.xmpp = session.xmpp
101 self.stored = stored
102 self._set_logger()
103 super().__init__()
105 @property
106 def jid(self) -> JID:
107 jid = JID(self.stored.jid)
108 jid.resource = self.RESOURCE
109 return jid
111 @jid.setter
112 def jid(self, _jid: JID) -> None:
113 raise RuntimeError
115 @property
116 def legacy_id(self) -> str:
117 return self.stored.legacy_id
119 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
120 if fetch and not self.stored.vcard_fetched:
121 await self.fetch_vcard()
122 if self.stored.vcard is None:
123 return None
125 return VCard4(xml=ET.fromstring(self.stored.vcard))
127 @property
128 def is_friend(self) -> bool:
129 return self.stored.is_friend
131 @is_friend.setter
132 def is_friend(self, value: bool) -> None:
133 if value == self.is_friend:
134 return
135 self.update_stored_attribute(is_friend=value)
137 @property
138 def added_to_roster(self) -> bool:
139 return self.stored.added_to_roster
141 @added_to_roster.setter
142 def added_to_roster(self, value: bool) -> None:
143 if value == self.added_to_roster:
144 return
145 self.update_stored_attribute(added_to_roster=value)
147 @property
148 def participants(self) -> Iterator["LegacyParticipant[Self]"]:
149 with self.xmpp.store.session() as orm:
150 self.stored = orm.merge(self.stored)
151 participants = self.stored.participants
152 for p in participants:
153 with self.xmpp.store.session() as orm:
154 p = orm.merge(p)
155 muc = self.session.bookmarks.from_store(p.room)
156 yield muc.participant_from_store(p, contact=self)
158 @property
159 def user_jid(self) -> JID:
160 return self.session.user_jid
162 @property # type:ignore
163 def DISCO_TYPE(self) -> ClientType:
164 return self.client_type
166 @DISCO_TYPE.setter
167 def DISCO_TYPE(self, value: ClientType) -> None:
168 self.client_type = value
170 @property
171 def client_type(self) -> ClientType:
172 """
173 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
175 Default is "pc".
176 """
177 return self.stored.client_type
179 @client_type.setter
180 def client_type(self, value: ClientType) -> None:
181 if self.stored.client_type == value:
182 return
183 self.update_stored_attribute(client_type=value)
185 def _set_logger(self) -> None:
186 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
188 def __repr__(self) -> str:
189 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
191 def __get_subscription_string(self) -> str:
192 if self.is_friend:
193 return "both"
194 return "none"
196 def __propagate_to_participants(self, stanza: Presence) -> None:
197 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
198 return
200 ptype = stanza["type"]
201 if ptype in ("available", "chat"):
202 func_name = "online"
203 elif ptype in ("xa", "unavailable"):
204 # we map unavailable to extended_away, because offline is
205 # "participant leaves the MUC"
206 # TODO: improve this with a clear distinction between participant
207 # and member list
208 func_name = "extended_away"
209 elif ptype == "busy":
210 func_name = "busy"
211 elif ptype == "away":
212 func_name = "away"
213 else:
214 return
216 last_seen: datetime.datetime | None = (
217 stanza["idle"]["since"] if "idle" in stanza else None
218 )
220 kw = dict(status=stanza["status"], last_seen=last_seen)
222 for part in self.participants:
223 func = getattr(part, func_name)
224 func(**kw)
226 def _send(
227 self,
228 stanza: MessageOrPresenceTypeVar,
229 carbon: bool = False,
230 nick: bool = False,
231 **send_kwargs: Any, # noqa:ANN401
232 ) -> MessageOrPresenceTypeVar:
233 if carbon and isinstance(stanza, Message):
234 stanza["to"] = self.jid.bare
235 stanza["from"] = self.user_jid
236 self._privileged_send(stanza)
237 return stanza
239 if isinstance(stanza, Presence):
240 if not self._updating_info:
241 self.__propagate_to_participants(stanza)
242 if (
243 not self.is_friend
244 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
245 ):
246 return stanza
247 if self.name and (nick or not self.is_friend):
248 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
249 n["nick"] = self.name
250 stanza.append(n)
251 if (
252 not self._updating_info
253 and self.xmpp.MARK_ALL_MESSAGES
254 and is_markable(stanza)
255 ):
256 with self.xmpp.store.session(expire_on_commit=False) as orm:
257 self.stored = orm.merge(self.stored)
258 exists = (
259 orm.query(ContactSent)
260 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"])
261 .first()
262 )
263 if exists:
264 self.log.warning(
265 "Contact has already sent message %s", stanza["id"]
266 )
267 else:
268 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
269 orm.add(new)
270 self.stored.sent_order.append(new)
271 orm.commit()
272 stanza["to"] = self.user_jid
273 stanza.send()
274 return stanza
276 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]:
277 """
278 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
280 Legacy modules have no reason to use this, but it is used by slidge core
281 for legacy networks that need to mark all messages as read (most XMPP
282 clients only send a read marker for the latest message).
284 This has side effects, if the horizon XMPP id is found, messages up to
285 this horizon are cleared, to avoid sending the same read mark twice.
287 :param horizon_xmpp_id: The latest message
288 :return: A list of XMPP ids up to horizon_xmpp_id, included
289 """
290 with self.xmpp.store.session() as orm:
291 assert self.stored.id is not None
292 ids = self.xmpp.store.contacts.pop_sent_up_to(
293 orm, self.stored.id, horizon_xmpp_id
294 )
295 orm.commit()
296 return ids
298 @property
299 def name(self) -> str:
300 """
301 Friendly name of the contact, as it should appear in the user's roster
302 """
303 return self.stored.nick or ""
305 @name.setter
306 def name(self, n: str | None) -> None:
307 if self.stored.nick == n:
308 return
309 self.update_stored_attribute(nick=n)
310 self._set_logger()
311 if self.is_friend and self.added_to_roster:
312 self.xmpp.pubsub.broadcast_nick(
313 user_jid=self.user_jid, jid=self.jid.bare, nick=n
314 )
315 for p in self.participants:
316 p.nickname = n or str(self.legacy_id)
318 def _post_avatar_update(self, cached_avatar: CachedAvatar | None) -> None:
319 if self.is_friend and self.added_to_roster:
320 self.session.create_task(
321 self.session.xmpp.pubsub.broadcast_avatar(
322 self.jid.bare, self.session.user_jid, cached_avatar
323 )
324 )
325 for p in self.participants:
326 self.log.debug("Propagating new avatar to %s", p.muc)
327 p.send_last_presence(force=True, no_cache_online=True)
329 def set_vcard(
330 self,
331 /,
332 full_name: str | None = None,
333 given: str | None = None,
334 surname: str | None = None,
335 birthday: date | None = None,
336 phone: str | None = None,
337 phones: Iterable[str] = (),
338 note: str | None = None,
339 url: str | None = None,
340 email: str | None = None,
341 country: str | None = None,
342 locality: str | None = None,
343 pronouns: str | None = None,
344 ) -> None:
345 """
346 Update xep:`0292` data for this contact.
348 Use this for additional metadata about this contact to be available to XMPP
349 clients. The "note" argument is a text of arbitrary size and can be useful when
350 no other field is a good fit.
351 """
352 vcard = VCard4()
353 vcard.add_impp(f"xmpp:{self.jid.bare}")
355 if n := self.name:
356 vcard.add_nickname(n)
357 if full_name:
358 vcard["full_name"] = full_name
359 elif n:
360 vcard["full_name"] = n
362 if given:
363 vcard["given"] = given
364 if surname:
365 vcard["surname"] = surname
366 if birthday:
367 vcard["birthday"] = birthday
369 if note:
370 vcard.add_note(note)
371 if url:
372 vcard.add_url(url)
373 if email:
374 vcard.add_email(email)
375 if phone:
376 vcard.add_tel(phone)
377 for p in phones:
378 vcard.add_tel(p)
379 if (country and locality) or country:
380 vcard.add_address(country, locality)
381 if pronouns:
382 vcard["pronouns"]["text"] = pronouns
384 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True)
385 self.session.create_task(
386 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
387 )
389 def get_roster_item(self) -> dict[str, dict[str, str | Sequence[str]]]:
390 item = {
391 "subscription": self.__get_subscription_string(),
392 "groups": [self.xmpp.ROSTER_GROUP],
393 }
394 if (n := self.name) is not None:
395 item["name"] = n
396 return {self.jid.bare: item}
398 async def add_to_roster(self, force: bool = False) -> None:
399 """
400 Add this contact to the user roster using :xep:`0356`
402 :param force: add even if the contact was already added successfully
403 """
404 if self.added_to_roster and not force:
405 return
406 if not self.session.user.preferences.get("roster_push", True):
407 log.debug("Roster push request by plugin ignored (--no-roster-push)")
408 return
409 try:
410 await self.xmpp["xep_0356"].set_roster(
411 jid=self.user_jid, roster_items=self.get_roster_item()
412 )
413 except PermissionError:
414 warnings.warn(
415 "Slidge does not have the privilege (XEP-0356) to manage rosters. "
416 "Consider configuring your XMPP server for that."
417 )
418 self.send_friend_request(
419 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
420 "slidge is not allowed to manage your roster."
421 )
422 return
423 except (IqError, IqTimeout) as e:
424 self.log.warning("Could not add to roster", exc_info=e)
425 else:
426 # we only broadcast pubsub events for contacts added to the roster
427 # so if something was set before, we need to push it now
428 self.added_to_roster = True
429 self.send_last_presence(force=True)
431 async def __broadcast_pubsub_items(self) -> None:
432 if not self.is_friend:
433 return
434 if not self.added_to_roster:
435 return
436 cached_avatar = self.get_cached_avatar()
437 if cached_avatar is not None:
438 await self.xmpp.pubsub.broadcast_avatar(
439 self.jid.bare, self.session.user_jid, cached_avatar
440 )
441 nick = self.name
443 if nick is not None:
444 self.xmpp.pubsub.broadcast_nick(
445 self.session.user_jid,
446 self.jid.bare,
447 nick,
448 )
450 def send_friend_request(self, text: str | None = None) -> None:
451 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
452 self._send(presence, nick=True)
454 async def accept_friend_request(self, text: str | None = None) -> None:
455 """
456 Call this to signify that this Contact has accepted to be a friend
457 of the user.
459 :param text: Optional message from the friend to the user
460 """
461 self.is_friend = True
462 self.added_to_roster = True
463 self.log.debug("Accepting friend request")
464 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
465 self._send(presence, nick=True)
466 self.send_last_presence()
467 await self.__broadcast_pubsub_items()
468 self.log.debug("Accepted friend request")
470 def reject_friend_request(self, text: str | None = None) -> None:
471 """
472 Call this to signify that this Contact has refused to be a contact
473 of the user (or that they don't want to be friends anymore)
475 :param text: Optional message from the non-friend to the user
476 """
477 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
478 self.offline()
479 self._send(presence, nick=True)
480 self.is_friend = False
482 async def on_friend_request(self, text: str = "") -> None:
483 """
484 Called when receiving a "subscribe" presence, ie, "I would like to add
485 you to my contacts/friends", from the user to this contact.
487 In XMPP terms: "I would like to receive your presence updates"
489 This is only called if self.is_friend = False. If self.is_friend = True,
490 slidge will automatically "accept the friend request", ie, reply with
491 a "subscribed" presence.
493 When called, a 'friend request event' should be sent to the legacy
494 service, and when the contact responds, you should either call
495 self.accept_subscription() or self.reject_subscription()
496 """
497 pass
499 async def on_friend_delete(self, text: str = "") -> None:
500 """
501 Called when receiving an "unsubscribed" presence, ie, "I would like to
502 remove you to my contacts/friends" or "I refuse your friend request"
503 from the user to this contact.
505 In XMPP terms: "You won't receive my presence updates anymore (or you
506 never have)".
507 """
508 pass
510 async def on_friend_accept(self) -> None:
511 """
512 Called when receiving a "subscribed" presence, ie, "I accept to be
513 your/confirm that you are my friend" from the user to this contact.
515 In XMPP terms: "You will receive my presence updates".
516 """
517 pass
519 def unsubscribe(self) -> None:
520 """
521 (internal use by slidge)
523 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
524 from this contact to the user, ie, "this contact has removed you from
525 their 'friends'".
526 """
527 for ptype in "unsubscribe", "unsubscribed", "unavailable":
528 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
530 async def update_info(self) -> None:
531 """
532 Fetch information about this contact from the legacy network
534 This is awaited on Contact instantiation, and should be overridden to
535 update the nickname, avatar, vcard [...] of this contact, by making
536 "legacy API calls".
538 To take advantage of the slidge avatar cache, you can check the .avatar
539 property to retrieve the "legacy file ID" of the cached avatar. If there
540 is no change, you should not call
541 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
542 to modify the ``.avatar`` property.
543 """
544 pass
546 async def fetch_vcard(self) -> None:
547 """
548 It the legacy network doesn't like that you fetch too many profiles on startup,
549 it's also possible to fetch it here, which will be called when XMPP clients
550 of the user request the vcard, if it hasn't been fetched before
551 :return:
552 """
553 pass
555 def _make_presence(
556 self,
557 *,
558 last_seen: datetime.datetime | None = None,
559 status_codes: set[int] | None = None,
560 user_full_jid: JID | None = None,
561 **presence_kwargs: Any, # noqa:ANN401
562 ) -> Presence:
563 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
564 caps = self.xmpp.plugin["xep_0115"]
565 if p.get_from().resource and self.stored.caps_ver:
566 p["caps"]["node"] = caps.caps_node
567 p["caps"]["hash"] = caps.hash
568 p["caps"]["ver"] = self.stored.caps_ver
569 return p
572def is_markable(stanza: Message | Presence) -> bool:
573 if isinstance(stanza, Presence):
574 return False
575 return bool(stanza["body"])
578log = logging.getLogger(__name__)