Coverage for slidge / contact / contact.py: 86%
286 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import datetime
2import logging
3import warnings
4from collections.abc import Iterable, Iterator, Sequence
5from datetime import date
6from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal
7from xml.etree import ElementTree as ET
9from slixmpp import JID, Message, Presence
10from slixmpp.exceptions import IqError, IqTimeout
11from slixmpp.plugins.xep_0292.stanza import VCard4
12from slixmpp.types import MessageTypes
14from slidge.db.avatar import CachedAvatar
16from ..core.mixins import AvatarMixin, FullCarbonMixin
17from ..core.mixins.disco import ContactAccountDiscoMixin
18from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
19from ..db.models import Contact, ContactSent
20from ..util import SubclassableOnce
21from ..util.types import (
22 AnyContact,
23 AnySession,
24 ClientType,
25 LegacyUserIdType,
26 MessageOrPresenceTypeVar,
27)
29if TYPE_CHECKING:
30 from ..command.base import ContactCommand
31 from ..group.participant import LegacyParticipant
34class LegacyContact(
35 Generic[LegacyUserIdType],
36 AvatarMixin,
37 ContactAccountDiscoMixin,
38 FullCarbonMixin,
39 ReactionRecipientMixin,
40 ThreadRecipientMixin,
41 SubclassableOnce,
42):
43 """
44 This class centralizes actions in relation to a specific legacy contact.
46 You shouldn't create instances of contacts manually, but rather rely on
47 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
48 singletons. The :class:`.LegacyRoster` instance of a session is accessible
49 through the :attr:`.BaseSession.contacts` attribute.
51 Typically, your plugin should have methods hook to the legacy events and
52 call appropriate methods here to transmit the "legacy action" to the xmpp
53 user. This should look like this:
55 .. code-block:python
57 class Session(BaseSession):
58 ...
60 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
61 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
62 contact.send_text(legacy_msg_event.text)
64 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
65 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
66 contact.composing()
67 ...
69 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
70 the user TO the contact, typically when the user uses an official client to
71 do an action such as sending a message or marking as message as read.
72 This will use :xep:`0363` to impersonate the XMPP user in order.
73 """
75 session: AnySession
77 RESOURCE: str = "slidge"
78 """
79 A full JID, including a resource part is required for chat states (and maybe other stuff)
80 to work properly. This is the name of the resource the contacts will use.
81 """
82 PROPAGATE_PRESENCE_TO_GROUPS = True
84 mtype: MessageTypes = "chat"
85 _can_send_carbon = True
86 is_participant: Literal[False] = False
87 is_group: Literal[False] = False
89 _ONLY_SEND_PRESENCE_CHANGES = True
91 STRIP_SHORT_DELAY = True
92 _NON_FRIEND_PRESENCES_FILTER: ClassVar[set[str]] = {"subscribe", "unsubscribed"}
94 INVITATION_RECIPIENT = True
96 commands: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {}
97 commands_chat: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {}
99 stored: Contact
100 model: Contact
102 def __init__(self, session: AnySession, stored: Contact) -> None:
103 self.session = session
104 self.xmpp = session.xmpp
105 self.stored = stored
106 self._set_logger()
107 super().__init__()
109 @property
110 def jid(self) -> JID:
111 jid = JID(self.stored.jid)
112 jid.resource = self.RESOURCE
113 return jid
115 @jid.setter
116 def jid(self, _jid: JID) -> None:
117 raise RuntimeError
119 @property
120 def legacy_id(self) -> LegacyUserIdType:
121 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) # type:ignore[no-any-return]
123 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
124 if fetch and not self.stored.vcard_fetched:
125 await self.fetch_vcard()
126 if self.stored.vcard is None:
127 return None
129 return VCard4(xml=ET.fromstring(self.stored.vcard))
131 @property
132 def is_friend(self) -> bool:
133 return self.stored.is_friend
135 @is_friend.setter
136 def is_friend(self, value: bool) -> None:
137 if value == self.is_friend:
138 return
139 self.update_stored_attribute(is_friend=value)
141 @property
142 def added_to_roster(self) -> bool:
143 return self.stored.added_to_roster
145 @added_to_roster.setter
146 def added_to_roster(self, value: bool) -> None:
147 if value == self.added_to_roster:
148 return
149 self.update_stored_attribute(added_to_roster=value)
151 @property
152 def participants(self) -> Iterator["LegacyParticipant"]:
153 with self.xmpp.store.session() as orm:
154 self.stored = orm.merge(self.stored)
155 participants = self.stored.participants
156 for p in participants:
157 with self.xmpp.store.session() as orm:
158 p = orm.merge(p)
159 muc = self.session.bookmarks.from_store(p.room)
160 yield muc.participant_from_store(p, contact=self)
162 @property
163 def user_jid(self) -> JID:
164 return self.session.user_jid
166 @property # type:ignore
167 def DISCO_TYPE(self) -> ClientType:
168 return self.client_type
170 @DISCO_TYPE.setter
171 def DISCO_TYPE(self, value: ClientType) -> None:
172 self.client_type = value
174 @property
175 def client_type(self) -> ClientType:
176 """
177 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
179 Default is "pc".
180 """
181 return self.stored.client_type
183 @client_type.setter
184 def client_type(self, value: ClientType) -> None:
185 if self.stored.client_type == value:
186 return
187 self.update_stored_attribute(client_type=value)
189 def _set_logger(self) -> None:
190 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
192 def __repr__(self) -> str:
193 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
195 def __get_subscription_string(self) -> str:
196 if self.is_friend:
197 return "both"
198 return "none"
200 def __propagate_to_participants(self, stanza: Presence) -> None:
201 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
202 return
204 ptype = stanza["type"]
205 if ptype in ("available", "chat"):
206 func_name = "online"
207 elif ptype in ("xa", "unavailable"):
208 # we map unavailable to extended_away, because offline is
209 # "participant leaves the MUC"
210 # TODO: improve this with a clear distinction between participant
211 # and member list
212 func_name = "extended_away"
213 elif ptype == "busy":
214 func_name = "busy"
215 elif ptype == "away":
216 func_name = "away"
217 else:
218 return
220 last_seen: datetime.datetime | None = (
221 stanza["idle"]["since"] if "idle" in stanza else None
222 )
224 kw = dict(status=stanza["status"], last_seen=last_seen)
226 for part in self.participants:
227 func = getattr(part, func_name)
228 func(**kw)
230 def _send(
231 self,
232 stanza: MessageOrPresenceTypeVar,
233 carbon: bool = False,
234 nick: bool = False,
235 **send_kwargs: Any, # noqa:ANN401
236 ) -> MessageOrPresenceTypeVar:
237 if carbon and isinstance(stanza, Message):
238 stanza["to"] = self.jid.bare
239 stanza["from"] = self.user_jid
240 self._privileged_send(stanza)
241 return stanza
243 if isinstance(stanza, Presence):
244 if not self._updating_info:
245 self.__propagate_to_participants(stanza)
246 if (
247 not self.is_friend
248 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
249 ):
250 return stanza
251 if self.name and (nick or not self.is_friend):
252 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
253 n["nick"] = self.name
254 stanza.append(n)
255 if (
256 not self._updating_info
257 and self.xmpp.MARK_ALL_MESSAGES
258 and is_markable(stanza)
259 ):
260 with self.xmpp.store.session(expire_on_commit=False) as orm:
261 self.stored = orm.merge(self.stored)
262 exists = (
263 orm.query(ContactSent)
264 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"])
265 .first()
266 )
267 if exists:
268 self.log.warning(
269 "Contact has already sent message %s", stanza["id"]
270 )
271 else:
272 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
273 orm.add(new)
274 self.stored.sent_order.append(new)
275 orm.commit()
276 stanza["to"] = self.user_jid
277 stanza.send()
278 return stanza
280 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]:
281 """
282 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
284 Legacy modules have no reason to use this, but it is used by slidge core
285 for legacy networks that need to mark all messages as read (most XMPP
286 clients only send a read marker for the latest message).
288 This has side effects, if the horizon XMPP id is found, messages up to
289 this horizon are cleared, to avoid sending the same read mark twice.
291 :param horizon_xmpp_id: The latest message
292 :return: A list of XMPP ids up to horizon_xmpp_id, included
293 """
294 with self.xmpp.store.session() as orm:
295 assert self.stored.id is not None
296 ids = self.xmpp.store.contacts.pop_sent_up_to(
297 orm, self.stored.id, horizon_xmpp_id
298 )
299 orm.commit()
300 return ids
302 @property
303 def name(self) -> str:
304 """
305 Friendly name of the contact, as it should appear in the user's roster
306 """
307 return self.stored.nick or ""
309 @name.setter
310 def name(self, n: str | None) -> None:
311 if self.stored.nick == n:
312 return
313 self.update_stored_attribute(nick=n)
314 self._set_logger()
315 if self.is_friend and self.added_to_roster:
316 self.xmpp.pubsub.broadcast_nick(
317 user_jid=self.user_jid, jid=self.jid.bare, nick=n
318 )
319 for p in self.participants:
320 p.nickname = n or str(self.legacy_id)
322 def _post_avatar_update(self, cached_avatar: CachedAvatar | None) -> None:
323 if self.is_friend and self.added_to_roster:
324 self.session.create_task(
325 self.session.xmpp.pubsub.broadcast_avatar(
326 self.jid.bare, self.session.user_jid, cached_avatar
327 )
328 )
329 for p in self.participants:
330 self.log.debug("Propagating new avatar to %s", p.muc)
331 p.send_last_presence(force=True, no_cache_online=True)
333 def set_vcard(
334 self,
335 /,
336 full_name: str | None = None,
337 given: str | None = None,
338 surname: str | None = None,
339 birthday: date | None = None,
340 phone: str | None = None,
341 phones: Iterable[str] = (),
342 note: str | None = None,
343 url: str | None = None,
344 email: str | None = None,
345 country: str | None = None,
346 locality: str | None = None,
347 pronouns: str | None = None,
348 ) -> None:
349 """
350 Update xep:`0292` data for this contact.
352 Use this for additional metadata about this contact to be available to XMPP
353 clients. The "note" argument is a text of arbitrary size and can be useful when
354 no other field is a good fit.
355 """
356 vcard = VCard4()
357 vcard.add_impp(f"xmpp:{self.jid.bare}")
359 if n := self.name:
360 vcard.add_nickname(n)
361 if full_name:
362 vcard["full_name"] = full_name
363 elif n:
364 vcard["full_name"] = n
366 if given:
367 vcard["given"] = given
368 if surname:
369 vcard["surname"] = surname
370 if birthday:
371 vcard["birthday"] = birthday
373 if note:
374 vcard.add_note(note)
375 if url:
376 vcard.add_url(url)
377 if email:
378 vcard.add_email(email)
379 if phone:
380 vcard.add_tel(phone)
381 for p in phones:
382 vcard.add_tel(p)
383 if (country and locality) or country:
384 vcard.add_address(country, locality)
385 if pronouns:
386 vcard["pronouns"]["text"] = pronouns
388 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True)
389 self.session.create_task(
390 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
391 )
393 def get_roster_item(self) -> dict[str, dict[str, str | Sequence[str]]]:
394 item = {
395 "subscription": self.__get_subscription_string(),
396 "groups": [self.xmpp.ROSTER_GROUP],
397 }
398 if (n := self.name) is not None:
399 item["name"] = n
400 return {self.jid.bare: item}
402 async def add_to_roster(self, force: bool = False) -> None:
403 """
404 Add this contact to the user roster using :xep:`0356`
406 :param force: add even if the contact was already added successfully
407 """
408 if self.added_to_roster and not force:
409 return
410 if not self.session.user.preferences.get("roster_push", True):
411 log.debug("Roster push request by plugin ignored (--no-roster-push)")
412 return
413 try:
414 await self.xmpp["xep_0356"].set_roster(
415 jid=self.user_jid, roster_items=self.get_roster_item()
416 )
417 except PermissionError:
418 warnings.warn(
419 "Slidge does not have the privilege (XEP-0356) to manage rosters. "
420 "Consider configuring your XMPP server for that."
421 )
422 self.send_friend_request(
423 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
424 "slidge is not allowed to manage your roster."
425 )
426 return
427 except (IqError, IqTimeout) as e:
428 self.log.warning("Could not add to roster", exc_info=e)
429 else:
430 # we only broadcast pubsub events for contacts added to the roster
431 # so if something was set before, we need to push it now
432 self.added_to_roster = True
433 self.send_last_presence(force=True)
435 async def __broadcast_pubsub_items(self) -> None:
436 if not self.is_friend:
437 return
438 if not self.added_to_roster:
439 return
440 cached_avatar = self.get_cached_avatar()
441 if cached_avatar is not None:
442 await self.xmpp.pubsub.broadcast_avatar(
443 self.jid.bare, self.session.user_jid, cached_avatar
444 )
445 nick = self.name
447 if nick is not None:
448 self.xmpp.pubsub.broadcast_nick(
449 self.session.user_jid,
450 self.jid.bare,
451 nick,
452 )
454 def send_friend_request(self, text: str | None = None) -> None:
455 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
456 self._send(presence, nick=True)
458 async def accept_friend_request(self, text: str | None = None) -> None:
459 """
460 Call this to signify that this Contact has accepted to be a friend
461 of the user.
463 :param text: Optional message from the friend to the user
464 """
465 self.is_friend = True
466 self.added_to_roster = True
467 self.log.debug("Accepting friend request")
468 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
469 self._send(presence, nick=True)
470 self.send_last_presence()
471 await self.__broadcast_pubsub_items()
472 self.log.debug("Accepted friend request")
474 def reject_friend_request(self, text: str | None = None) -> None:
475 """
476 Call this to signify that this Contact has refused to be a contact
477 of the user (or that they don't want to be friends anymore)
479 :param text: Optional message from the non-friend to the user
480 """
481 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
482 self.offline()
483 self._send(presence, nick=True)
484 self.is_friend = False
486 async def on_friend_request(self, text: str = "") -> None:
487 """
488 Called when receiving a "subscribe" presence, ie, "I would like to add
489 you to my contacts/friends", from the user to this contact.
491 In XMPP terms: "I would like to receive your presence updates"
493 This is only called if self.is_friend = False. If self.is_friend = True,
494 slidge will automatically "accept the friend request", ie, reply with
495 a "subscribed" presence.
497 When called, a 'friend request event' should be sent to the legacy
498 service, and when the contact responds, you should either call
499 self.accept_subscription() or self.reject_subscription()
500 """
501 pass
503 async def on_friend_delete(self, text: str = "") -> None:
504 """
505 Called when receiving an "unsubscribed" presence, ie, "I would like to
506 remove you to my contacts/friends" or "I refuse your friend request"
507 from the user to this contact.
509 In XMPP terms: "You won't receive my presence updates anymore (or you
510 never have)".
511 """
512 pass
514 async def on_friend_accept(self) -> None:
515 """
516 Called when receiving a "subscribed" presence, ie, "I accept to be
517 your/confirm that you are my friend" from the user to this contact.
519 In XMPP terms: "You will receive my presence updates".
520 """
521 pass
523 def unsubscribe(self) -> None:
524 """
525 (internal use by slidge)
527 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
528 from this contact to the user, ie, "this contact has removed you from
529 their 'friends'".
530 """
531 for ptype in "unsubscribe", "unsubscribed", "unavailable":
532 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
534 async def update_info(self) -> None:
535 """
536 Fetch information about this contact from the legacy network
538 This is awaited on Contact instantiation, and should be overridden to
539 update the nickname, avatar, vcard [...] of this contact, by making
540 "legacy API calls".
542 To take advantage of the slidge avatar cache, you can check the .avatar
543 property to retrieve the "legacy file ID" of the cached avatar. If there
544 is no change, you should not call
545 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
546 to modify the ``.avatar`` property.
547 """
548 pass
550 async def fetch_vcard(self) -> None:
551 """
552 It the legacy network doesn't like that you fetch too many profiles on startup,
553 it's also possible to fetch it here, which will be called when XMPP clients
554 of the user request the vcard, if it hasn't been fetched before
555 :return:
556 """
557 pass
559 def _make_presence(
560 self,
561 *,
562 last_seen: datetime.datetime | None = None,
563 status_codes: set[int] | None = None,
564 user_full_jid: JID | None = None,
565 **presence_kwargs: Any, # noqa:ANN401
566 ) -> Presence:
567 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
568 caps = self.xmpp.plugin["xep_0115"]
569 if p.get_from().resource and self.stored.caps_ver:
570 p["caps"]["node"] = caps.caps_node
571 p["caps"]["hash"] = caps.hash
572 p["caps"]["ver"] = self.stored.caps_ver
573 return p
576def is_markable(stanza: Message | Presence) -> bool:
577 if isinstance(stanza, Presence):
578 return False
579 return bool(stanza["body"])
582log = logging.getLogger(__name__)