Coverage for slidge / contact / contact.py: 86%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import datetime
2import logging
3import warnings
4from collections.abc import Iterable, Iterator, Sequence
5from datetime import date
6from typing import TYPE_CHECKING, Any, ClassVar, Generic
7from xml.etree import ElementTree as ET
9from slixmpp import JID, Message, Presence
10from slixmpp.exceptions import IqError, IqTimeout
11from slixmpp.plugins.xep_0292.stanza import VCard4
12from slixmpp.types import MessageTypes
14from slidge.db.avatar import CachedAvatar
16from ..core.mixins import AvatarMixin, FullCarbonMixin
17from ..core.mixins.disco import ContactAccountDiscoMixin
18from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
19from ..db.models import Contact, ContactSent
20from ..util import SubclassableOnce
21from ..util.types import (
22 AnyContact,
23 AnySession,
24 ClientType,
25 LegacyUserIdType,
26 MessageOrPresenceTypeVar,
27)
29if TYPE_CHECKING:
30 from ..command.base import ContactCommand
31 from ..group.participant import LegacyParticipant
34class LegacyContact(
35 Generic[LegacyUserIdType],
36 AvatarMixin,
37 ContactAccountDiscoMixin,
38 FullCarbonMixin,
39 ReactionRecipientMixin,
40 ThreadRecipientMixin,
41 SubclassableOnce,
42):
43 """
44 This class centralizes actions in relation to a specific legacy contact.
46 You shouldn't create instances of contacts manually, but rather rely on
47 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
48 singletons. The :class:`.LegacyRoster` instance of a session is accessible
49 through the :attr:`.BaseSession.contacts` attribute.
51 Typically, your plugin should have methods hook to the legacy events and
52 call appropriate methods here to transmit the "legacy action" to the xmpp
53 user. This should look like this:
55 .. code-block:python
57 class Session(BaseSession):
58 ...
60 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
61 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
62 contact.send_text(legacy_msg_event.text)
64 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
65 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
66 contact.composing()
67 ...
69 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
70 the user TO the contact, typically when the user uses an official client to
71 do an action such as sending a message or marking as message as read.
72 This will use :xep:`0363` to impersonate the XMPP user in order.
73 """
75 session: AnySession
77 RESOURCE: str = "slidge"
78 """
79 A full JID, including a resource part is required for chat states (and maybe other stuff)
80 to work properly. This is the name of the resource the contacts will use.
81 """
82 PROPAGATE_PRESENCE_TO_GROUPS = True
84 mtype: MessageTypes = "chat"
85 _can_send_carbon = True
86 is_participant = False
87 is_group = False
89 _ONLY_SEND_PRESENCE_CHANGES = True
91 STRIP_SHORT_DELAY = True
92 _NON_FRIEND_PRESENCES_FILTER: ClassVar[set[str]] = {"subscribe", "unsubscribed"}
94 INVITATION_RECIPIENT = True
96 commands: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {}
97 commands_chat: ClassVar[dict[str, "type[ContactCommand[AnyContact]]"]] = {}
99 stored: Contact
100 model: Contact
102 def __init__(self, session: AnySession, stored: Contact) -> None:
103 self.session = session
104 self.xmpp = session.xmpp
105 self.stored = stored
106 self._set_logger()
107 super().__init__()
109 @property
110 def jid(self) -> JID:
111 jid = JID(self.stored.jid)
112 jid.resource = self.RESOURCE
113 return jid
115 @jid.setter
116 def jid(self, _jid: JID) -> None:
117 raise RuntimeError
119 @property
120 def legacy_id(self) -> LegacyUserIdType:
121 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id) # type:ignore[no-any-return]
123 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
124 if fetch and not self.stored.vcard_fetched:
125 await self.fetch_vcard()
126 if self.stored.vcard is None:
127 return None
129 return VCard4(xml=ET.fromstring(self.stored.vcard))
131 @property
132 def is_friend(self) -> bool:
133 return self.stored.is_friend
135 @is_friend.setter
136 def is_friend(self, value: bool) -> None:
137 if value == self.is_friend:
138 return
139 self.update_stored_attribute(is_friend=value)
141 @property
142 def added_to_roster(self) -> bool:
143 return self.stored.added_to_roster
145 @added_to_roster.setter
146 def added_to_roster(self, value: bool) -> None:
147 if value == self.added_to_roster:
148 return
149 self.update_stored_attribute(added_to_roster=value)
151 @property
152 def participants(self) -> Iterator["LegacyParticipant"]:
153 with self.xmpp.store.session() as orm:
154 self.stored = orm.merge(self.stored)
155 participants = self.stored.participants
156 for p in participants:
157 with self.xmpp.store.session() as orm:
158 p = orm.merge(p)
159 muc = self.session.bookmarks.from_store(p.room)
160 yield muc.participant_from_store(p, contact=self)
162 @property
163 def user_jid(self) -> JID:
164 return self.session.user_jid
166 @property # type:ignore
167 def DISCO_TYPE(self) -> ClientType:
168 return self.client_type
170 @DISCO_TYPE.setter
171 def DISCO_TYPE(self, value: ClientType) -> None:
172 self.client_type = value
174 @property
175 def client_type(self) -> ClientType:
176 """
177 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
179 Default is "pc".
180 """
181 return self.stored.client_type
183 @client_type.setter
184 def client_type(self, value: ClientType) -> None:
185 if self.stored.client_type == value:
186 return
187 self.update_stored_attribute(client_type=value)
189 def _set_logger(self) -> None:
190 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
192 def __repr__(self) -> str:
193 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
195 def __get_subscription_string(self) -> str:
196 if self.is_friend:
197 return "both"
198 return "none"
200 def __propagate_to_participants(self, stanza: Presence) -> None:
201 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
202 return
204 ptype = stanza["type"]
205 if ptype in ("available", "chat"):
206 func_name = "online"
207 elif ptype in ("xa", "unavailable"):
208 # we map unavailable to extended_away, because offline is
209 # "participant leaves the MUC"
210 # TODO: improve this with a clear distinction between participant
211 # and member list
212 func_name = "extended_away"
213 elif ptype == "busy":
214 func_name = "busy"
215 elif ptype == "away":
216 func_name = "away"
217 else:
218 return
220 last_seen: datetime.datetime | None = (
221 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None
222 )
224 kw = dict(status=stanza["status"], last_seen=last_seen)
226 for part in self.participants:
227 func = getattr(part, func_name)
228 func(**kw)
230 def _send(
231 self,
232 stanza: MessageOrPresenceTypeVar,
233 carbon: bool = False,
234 nick: bool = False,
235 **send_kwargs: Any, # noqa:ANN401
236 ) -> MessageOrPresenceTypeVar:
237 if carbon and isinstance(stanza, Message):
238 stanza["to"] = self.jid.bare
239 stanza["from"] = self.user_jid
240 self._privileged_send(stanza)
241 return stanza
243 if isinstance(stanza, Presence):
244 if not self._updating_info:
245 self.__propagate_to_participants(stanza)
246 if (
247 not self.is_friend
248 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
249 ):
250 return stanza
251 if self.name and (nick or not self.is_friend):
252 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
253 n["nick"] = self.name
254 stanza.append(n)
255 if (
256 not self._updating_info
257 and self.xmpp.MARK_ALL_MESSAGES
258 and is_markable(stanza)
259 ):
260 with self.xmpp.store.session(expire_on_commit=False) as orm:
261 self.stored = orm.merge(self.stored)
262 exists = (
263 orm.query(ContactSent)
264 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"])
265 .first()
266 )
267 if exists:
268 self.log.warning(
269 "Contact has already sent message %s", stanza["id"]
270 )
271 else:
272 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
273 orm.add(new)
274 self.stored.sent_order.append(new)
275 orm.commit()
276 stanza["to"] = self.user_jid
277 stanza.send()
278 return stanza
280 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]:
281 """
282 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
284 Legacy modules have no reason to use this, but it is used by slidge core
285 for legacy networks that need to mark all messages as read (most XMPP
286 clients only send a read marker for the latest message).
288 This has side effects, if the horizon XMPP id is found, messages up to
289 this horizon are cleared, to avoid sending the same read mark twice.
291 :param horizon_xmpp_id: The latest message
292 :return: A list of XMPP ids up to horizon_xmpp_id, included
293 """
294 with self.xmpp.store.session() as orm:
295 assert self.stored.id is not None
296 ids = self.xmpp.store.contacts.pop_sent_up_to(
297 orm, self.stored.id, horizon_xmpp_id
298 )
299 orm.commit()
300 return ids
302 @property
303 def name(self) -> str:
304 """
305 Friendly name of the contact, as it should appear in the user's roster
306 """
307 return self.stored.nick or ""
309 @name.setter
310 def name(self, n: str | None) -> None:
311 if self.stored.nick == n:
312 return
313 self.update_stored_attribute(nick=n)
314 self._set_logger()
315 if self.is_friend and self.added_to_roster:
316 self.xmpp.pubsub.broadcast_nick(
317 user_jid=self.user_jid, jid=self.jid.bare, nick=n
318 )
319 for p in self.participants:
320 p.nickname = n or str(self.legacy_id)
322 def _post_avatar_update(self, cached_avatar: CachedAvatar | None) -> None:
323 if self.is_friend and self.added_to_roster:
324 self.session.create_task(
325 self.session.xmpp.pubsub.broadcast_avatar(
326 self.jid.bare, self.session.user_jid, cached_avatar
327 )
328 )
329 for p in self.participants:
330 self.log.debug("Propagating new avatar to %s", p.muc)
331 p.send_last_presence(force=True, no_cache_online=True)
333 def set_vcard(
334 self,
335 /,
336 full_name: str | None = None,
337 given: str | None = None,
338 surname: str | None = None,
339 birthday: date | None = None,
340 phone: str | None = None,
341 phones: Iterable[str] = (),
342 note: str | None = None,
343 url: str | None = None,
344 email: str | None = None,
345 country: str | None = None,
346 locality: str | None = None,
347 pronouns: str | None = None,
348 ) -> None:
349 """
350 Update xep:`0292` data for this contact.
352 Use this for additional metadata about this contact to be available to XMPP
353 clients. The "note" argument is a text of arbitrary size and can be useful when
354 no other field is a good fit.
355 """
356 vcard = VCard4()
357 vcard.add_impp(f"xmpp:{self.jid.bare}")
359 if n := self.name:
360 vcard.add_nickname(n)
361 if full_name:
362 vcard["full_name"] = full_name
363 elif n:
364 vcard["full_name"] = n
366 if given:
367 vcard["given"] = given
368 if surname:
369 vcard["surname"] = surname
370 if birthday:
371 vcard["birthday"] = birthday
373 if note:
374 vcard.add_note(note)
375 if url:
376 vcard.add_url(url)
377 if email:
378 vcard.add_email(email)
379 if phone:
380 vcard.add_tel(phone)
381 for p in phones:
382 vcard.add_tel(p)
383 if country and locality:
384 vcard.add_address(country, locality)
385 elif country:
386 vcard.add_address(country, locality)
387 if pronouns:
388 vcard["pronouns"]["text"] = pronouns
390 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True)
391 self.session.create_task(
392 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
393 )
395 def get_roster_item(self) -> dict[str, dict[str, str | Sequence[str]]]:
396 item = {
397 "subscription": self.__get_subscription_string(),
398 "groups": [self.xmpp.ROSTER_GROUP],
399 }
400 if (n := self.name) is not None:
401 item["name"] = n
402 return {self.jid.bare: item}
404 async def add_to_roster(self, force: bool = False) -> None:
405 """
406 Add this contact to the user roster using :xep:`0356`
408 :param force: add even if the contact was already added successfully
409 """
410 if self.added_to_roster and not force:
411 return
412 if not self.session.user.preferences.get("roster_push", True):
413 log.debug("Roster push request by plugin ignored (--no-roster-push)")
414 return
415 try:
416 await self.xmpp["xep_0356"].set_roster(
417 jid=self.user_jid, roster_items=self.get_roster_item()
418 )
419 except PermissionError:
420 warnings.warn(
421 "Slidge does not have the privilege (XEP-0356) to manage rosters. "
422 "Consider configuring your XMPP server for that."
423 )
424 self.send_friend_request(
425 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
426 "slidge is not allowed to manage your roster."
427 )
428 return
429 except (IqError, IqTimeout) as e:
430 self.log.warning("Could not add to roster", exc_info=e)
431 else:
432 # we only broadcast pubsub events for contacts added to the roster
433 # so if something was set before, we need to push it now
434 self.added_to_roster = True
435 self.send_last_presence(force=True)
437 async def __broadcast_pubsub_items(self) -> None:
438 if not self.is_friend:
439 return
440 if not self.added_to_roster:
441 return
442 cached_avatar = self.get_cached_avatar()
443 if cached_avatar is not None:
444 await self.xmpp.pubsub.broadcast_avatar(
445 self.jid.bare, self.session.user_jid, cached_avatar
446 )
447 nick = self.name
449 if nick is not None:
450 self.xmpp.pubsub.broadcast_nick(
451 self.session.user_jid,
452 self.jid.bare,
453 nick,
454 )
456 def send_friend_request(self, text: str | None = None) -> None:
457 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
458 self._send(presence, nick=True)
460 async def accept_friend_request(self, text: str | None = None) -> None:
461 """
462 Call this to signify that this Contact has accepted to be a friend
463 of the user.
465 :param text: Optional message from the friend to the user
466 """
467 self.is_friend = True
468 self.added_to_roster = True
469 self.log.debug("Accepting friend request")
470 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
471 self._send(presence, nick=True)
472 self.send_last_presence()
473 await self.__broadcast_pubsub_items()
474 self.log.debug("Accepted friend request")
476 def reject_friend_request(self, text: str | None = None) -> None:
477 """
478 Call this to signify that this Contact has refused to be a contact
479 of the user (or that they don't want to be friends anymore)
481 :param text: Optional message from the non-friend to the user
482 """
483 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
484 self.offline()
485 self._send(presence, nick=True)
486 self.is_friend = False
488 async def on_friend_request(self, text: str = "") -> None:
489 """
490 Called when receiving a "subscribe" presence, ie, "I would like to add
491 you to my contacts/friends", from the user to this contact.
493 In XMPP terms: "I would like to receive your presence updates"
495 This is only called if self.is_friend = False. If self.is_friend = True,
496 slidge will automatically "accept the friend request", ie, reply with
497 a "subscribed" presence.
499 When called, a 'friend request event' should be sent to the legacy
500 service, and when the contact responds, you should either call
501 self.accept_subscription() or self.reject_subscription()
502 """
503 pass
505 async def on_friend_delete(self, text: str = "") -> None:
506 """
507 Called when receiving an "unsubscribed" presence, ie, "I would like to
508 remove you to my contacts/friends" or "I refuse your friend request"
509 from the user to this contact.
511 In XMPP terms: "You won't receive my presence updates anymore (or you
512 never have)".
513 """
514 pass
516 async def on_friend_accept(self) -> None:
517 """
518 Called when receiving a "subscribed" presence, ie, "I accept to be
519 your/confirm that you are my friend" from the user to this contact.
521 In XMPP terms: "You will receive my presence updates".
522 """
523 pass
525 def unsubscribe(self) -> None:
526 """
527 (internal use by slidge)
529 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
530 from this contact to the user, ie, "this contact has removed you from
531 their 'friends'".
532 """
533 for ptype in "unsubscribe", "unsubscribed", "unavailable":
534 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
536 async def update_info(self) -> None:
537 """
538 Fetch information about this contact from the legacy network
540 This is awaited on Contact instantiation, and should be overridden to
541 update the nickname, avatar, vcard [...] of this contact, by making
542 "legacy API calls".
544 To take advantage of the slidge avatar cache, you can check the .avatar
545 property to retrieve the "legacy file ID" of the cached avatar. If there
546 is no change, you should not call
547 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
548 to modify the ``.avatar`` property.
549 """
550 pass
552 async def fetch_vcard(self) -> None:
553 """
554 It the legacy network doesn't like that you fetch too many profiles on startup,
555 it's also possible to fetch it here, which will be called when XMPP clients
556 of the user request the vcard, if it hasn't been fetched before
557 :return:
558 """
559 pass
561 def _make_presence(
562 self,
563 *,
564 last_seen: datetime.datetime | None = None,
565 status_codes: set[int] | None = None,
566 user_full_jid: JID | None = None,
567 **presence_kwargs: Any, # noqa:ANN401
568 ) -> Presence:
569 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
570 caps = self.xmpp.plugin["xep_0115"]
571 if p.get_from().resource and self.stored.caps_ver:
572 p["caps"]["node"] = caps.caps_node
573 p["caps"]["hash"] = caps.hash
574 p["caps"]["ver"] = self.stored.caps_ver
575 return p
578def is_markable(stanza: Message | Presence) -> bool:
579 if isinstance(stanza, Presence):
580 return False
581 return bool(stanza["body"])
584log = logging.getLogger(__name__)