Coverage for slidge/contact/contact.py: 86%
334 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import datetime
2import logging
3import warnings
4from datetime import date
5from typing import TYPE_CHECKING, Generic, Iterable, Optional, Self, Union
6from xml.etree import ElementTree as ET
8from slixmpp import JID, Message, Presence
9from slixmpp.exceptions import IqError, IqTimeout
10from slixmpp.plugins.xep_0292.stanza import VCard4
11from slixmpp.types import MessageTypes
13from ..core import config
14from ..core.mixins import AvatarMixin, FullCarbonMixin, StoredAttributeMixin
15from ..core.mixins.db import UpdateInfoMixin
16from ..core.mixins.disco import ContactAccountDiscoMixin
17from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
18from ..db.models import Contact
19from ..util import SubclassableOnce
20from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar
22if TYPE_CHECKING:
23 from ..core.session import BaseSession
24 from ..group.participant import LegacyParticipant
27class LegacyContact(
28 Generic[LegacyUserIdType],
29 StoredAttributeMixin,
30 AvatarMixin,
31 ContactAccountDiscoMixin,
32 FullCarbonMixin,
33 ReactionRecipientMixin,
34 ThreadRecipientMixin,
35 UpdateInfoMixin,
36 metaclass=SubclassableOnce,
37):
38 """
39 This class centralizes actions in relation to a specific legacy contact.
41 You shouldn't create instances of contacts manually, but rather rely on
42 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
43 singletons. The :class:`.LegacyRoster` instance of a session is accessible
44 through the :attr:`.BaseSession.contacts` attribute.
46 Typically, your plugin should have methods hook to the legacy events and
47 call appropriate methods here to transmit the "legacy action" to the xmpp
48 user. This should look like this:
50 .. code-block:python
52 class Session(BaseSession):
53 ...
55 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
56 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
57 contact.send_text(legacy_msg_event.text)
59 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
60 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
61 contact.composing()
62 ...
64 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
65 the user TO the contact, typically when the user uses an official client to
66 do an action such as sending a message or marking as message as read.
67 This will use :xep:`0363` to impersonate the XMPP user in order.
68 """
70 session: "BaseSession"
72 RESOURCE: str = "slidge"
73 """
74 A full JID, including a resource part is required for chat states (and maybe other stuff)
75 to work properly. This is the name of the resource the contacts will use.
76 """
77 PROPAGATE_PRESENCE_TO_GROUPS = True
79 mtype: MessageTypes = "chat"
80 _can_send_carbon = True
81 is_group = False
83 _ONLY_SEND_PRESENCE_CHANGES = True
85 STRIP_SHORT_DELAY = True
86 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"}
88 _avatar_bare_jid = True
90 INVITATION_RECIPIENT = True
92 def __init__(
93 self,
94 session: "BaseSession",
95 legacy_id: LegacyUserIdType,
96 jid_username: str,
97 ):
98 """
99 :param session: The session this contact is part of
100 :param legacy_id: The contact's legacy ID
101 :param jid_username: User part of this contact's 'puppet' JID.
102 NB: case-insensitive, and some special characters are not allowed
103 """
104 super().__init__()
105 self.session = session
106 self.legacy_id: LegacyUserIdType = legacy_id
107 """
108 The legacy identifier of the :term:`Legacy Contact`.
109 By default, this is the :term:`JID Local Part` of this
110 :term:`XMPP Entity`.
112 Controlling what values are valid and how they are translated from a
113 :term:`JID Local Part` is done in :meth:`.jid_username_to_legacy_id`.
114 Reciprocally, in :meth:`legacy_id_to_jid_username` the inverse
115 transformation is defined.
116 """
117 self.jid_username = jid_username
119 self._name: Optional[str] = None
121 self.xmpp = session.xmpp
122 self.jid = JID(self.jid_username + "@" + self.xmpp.boundjid.bare)
123 self.jid.resource = self.RESOURCE
124 self.log = logging.getLogger(self.jid.bare)
125 self._set_logger_name()
126 self._is_friend: bool = False
127 self._added_to_roster = False
128 self._caps_ver: str | None = None
129 self._vcard_fetched = False
130 self._vcard: str | None = None
131 self._client_type: ClientType = "pc"
133 async def get_vcard(self, fetch=True) -> VCard4 | None:
134 if fetch and not self._vcard_fetched:
135 await self.fetch_vcard()
136 if self._vcard is None:
137 return None
139 return VCard4(xml=ET.fromstring(self._vcard))
141 @property
142 def is_friend(self):
143 return self._is_friend
145 @is_friend.setter
146 def is_friend(self, value: bool):
147 if value == self._is_friend:
148 return
149 self._is_friend = value
150 if self._updating_info:
151 return
152 self.__ensure_pk()
153 assert self.contact_pk is not None
154 self.xmpp.store.contacts.set_friend(self.contact_pk, value)
156 @property
157 def added_to_roster(self):
158 return self._added_to_roster
160 @added_to_roster.setter
161 def added_to_roster(self, value: bool):
162 if value == self._added_to_roster:
163 return
164 self._added_to_roster = value
165 if self._updating_info:
166 return
167 if self.contact_pk is None:
168 # during LegacyRoster.fill()
169 return
170 self.xmpp.store.contacts.set_added_to_roster(self.contact_pk, value)
172 @property
173 def participants(self) -> list["LegacyParticipant"]:
174 if self.contact_pk is None:
175 return []
177 self.__ensure_pk()
178 from ..group.participant import LegacyParticipant
180 return [
181 LegacyParticipant.get_self_or_unique_subclass().from_store(
182 self.session, stored, contact=self
183 )
184 for stored in self.xmpp.store.participants.get_for_contact(self.contact_pk)
185 ]
187 @property
188 def user_jid(self):
189 return self.session.user_jid
191 @property # type:ignore
192 def DISCO_TYPE(self) -> ClientType:
193 return self._client_type
195 @DISCO_TYPE.setter
196 def DISCO_TYPE(self, value: ClientType) -> None:
197 self.client_type = value
199 @property
200 def client_type(self) -> ClientType:
201 """
202 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
204 Default is "pc".
205 """
206 return self._client_type
208 @client_type.setter
209 def client_type(self, value: ClientType) -> None:
210 self._client_type = value
211 if self._updating_info:
212 return
213 self.__ensure_pk()
214 assert self.contact_pk is not None
215 self.xmpp.store.contacts.set_client_type(self.contact_pk, value)
217 def _set_logger_name(self):
218 self.log.name = f"{self.user_jid.bare}:contact:{self}"
220 def __repr__(self):
221 return f"<Contact #{self.contact_pk} '{self.name}' ({self.legacy_id} - {self.jid.local})'>"
223 def __ensure_pk(self):
224 if self.contact_pk is not None:
225 return
226 # This happens for legacy modules that don't follow the Roster.fill /
227 # populate contact attributes in Contact.update_info() method.
228 # This results in (even) less optimised SQL writes and read, but
229 # we allow it because it fits some legacy network libs better.
230 with self.xmpp.store.session() as orm:
231 orm.commit()
232 stored = self.xmpp.store.contacts.get_by_legacy_id(
233 self.user_pk, str(self.legacy_id)
234 )
235 if stored is None:
236 self.contact_pk = self.xmpp.store.contacts.update(self, commit=True)
237 else:
238 self.contact_pk = stored.id
239 assert self.contact_pk is not None
241 def __get_subscription_string(self):
242 if self.is_friend:
243 return "both"
244 return "none"
246 def __propagate_to_participants(self, stanza: Presence):
247 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
248 return
250 ptype = stanza["type"]
251 if ptype in ("available", "chat"):
252 func_name = "online"
253 elif ptype in ("xa", "unavailable"):
254 # we map unavailable to extended_away, because offline is
255 # "participant leaves the MUC"
256 # TODO: improve this with a clear distinction between participant
257 # and member list
258 func_name = "extended_away"
259 elif ptype == "busy":
260 func_name = "busy"
261 elif ptype == "away":
262 func_name = "away"
263 else:
264 return
266 last_seen: Optional[datetime.datetime] = (
267 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None
268 )
270 kw = dict(status=stanza["status"], last_seen=last_seen)
272 for part in self.participants:
273 func = getattr(part, func_name)
274 func(**kw)
276 def _send(
277 self, stanza: MessageOrPresenceTypeVar, carbon=False, nick=False, **send_kwargs
278 ) -> MessageOrPresenceTypeVar:
279 if carbon and isinstance(stanza, Message):
280 stanza["to"] = self.jid.bare
281 stanza["from"] = self.user_jid
282 self._privileged_send(stanza)
283 return stanza # type:ignore
285 if isinstance(stanza, Presence):
286 if not self._updating_info:
287 self.__propagate_to_participants(stanza)
288 if (
289 not self.is_friend
290 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
291 ):
292 return stanza # type:ignore
293 if self.name and (nick or not self.is_friend):
294 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
295 n["nick"] = self.name
296 stanza.append(n)
297 if (
298 not self._updating_info
299 and self.xmpp.MARK_ALL_MESSAGES
300 and is_markable(stanza)
301 ):
302 self.__ensure_pk()
303 assert self.contact_pk is not None
304 self.xmpp.store.contacts.add_to_sent(self.contact_pk, stanza["id"])
305 stanza["to"] = self.user_jid
306 stanza.send()
307 return stanza
309 def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str) -> list[str]:
310 """
311 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
313 Plugins have no reason to use this, but it is used by slidge core
314 for legacy networks that need to mark all messages as read (most XMPP
315 clients only send a read marker for the latest message).
317 This has side effects, if the horizon XMPP id is found, messages up to
318 this horizon are not cleared, to avoid sending the same read mark twice.
320 :param horizon_xmpp_id: The latest message
321 :return: A list of XMPP ids or None if horizon_xmpp_id was not found
322 """
323 self.__ensure_pk()
324 assert self.contact_pk is not None
325 return self.xmpp.store.contacts.pop_sent_up_to(self.contact_pk, horizon_xmpp_id)
327 @property
328 def name(self):
329 """
330 Friendly name of the contact, as it should appear in the user's roster
331 """
332 return self._name
334 @name.setter
335 def name(self, n: Optional[str]):
336 if self._name == n:
337 return
338 self._name = n
339 self._set_logger_name()
340 if self.is_friend and self.added_to_roster:
341 self.xmpp.pubsub.broadcast_nick(
342 user_jid=self.user_jid, jid=self.jid.bare, nick=n
343 )
344 if self._updating_info:
345 # means we're in update_info(), so no participants, and no need
346 # to write to DB now, it will be called in Roster.__finish_init_contact
347 return
348 for p in self.participants:
349 p.nickname = n
350 self.__ensure_pk()
351 assert self.contact_pk is not None
352 self.xmpp.store.contacts.update_nick(self.contact_pk, n)
354 def _get_cached_avatar_id(self) -> Optional[str]:
355 if self.contact_pk is None:
356 return None
357 return self.xmpp.store.contacts.get_avatar_legacy_id(self.contact_pk)
359 def _post_avatar_update(self):
360 self.__ensure_pk()
361 assert self.contact_pk is not None
362 self.xmpp.store.contacts.set_avatar(
363 self.contact_pk,
364 self._avatar_pk,
365 None if self.avatar_id is None else str(self.avatar_id),
366 )
367 for p in self.participants:
368 self.log.debug("Propagating new avatar to %s", p.muc)
369 p.send_last_presence(force=True, no_cache_online=True)
371 def set_vcard(
372 self,
373 /,
374 full_name: Optional[str] = None,
375 given: Optional[str] = None,
376 surname: Optional[str] = None,
377 birthday: Optional[date] = None,
378 phone: Optional[str] = None,
379 phones: Iterable[str] = (),
380 note: Optional[str] = None,
381 url: Optional[str] = None,
382 email: Optional[str] = None,
383 country: Optional[str] = None,
384 locality: Optional[str] = None,
385 ):
386 vcard = VCard4()
387 vcard.add_impp(f"xmpp:{self.jid.bare}")
389 if n := self.name:
390 vcard.add_nickname(n)
391 if full_name:
392 vcard["full_name"] = full_name
393 elif n:
394 vcard["full_name"] = n
396 if given:
397 vcard["given"] = given
398 if surname:
399 vcard["surname"] = surname
400 if birthday:
401 vcard["birthday"] = birthday
403 if note:
404 vcard.add_note(note)
405 if url:
406 vcard.add_url(url)
407 if email:
408 vcard.add_email(email)
409 if phone:
410 vcard.add_tel(phone)
411 for p in phones:
412 vcard.add_tel(p)
413 if country and locality:
414 vcard.add_address(country, locality)
415 elif country:
416 vcard.add_address(country, locality)
418 self._vcard = str(vcard)
419 self._vcard_fetched = True
420 self.session.create_task(
421 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
422 )
424 if self._updating_info:
425 return
427 assert self.contact_pk is not None
428 self.xmpp.store.contacts.set_vcard(self.contact_pk, self._vcard)
430 def get_roster_item(self):
431 item = {
432 "subscription": self.__get_subscription_string(),
433 "groups": [self.xmpp.ROSTER_GROUP],
434 }
435 if (n := self.name) is not None:
436 item["name"] = n
437 return {self.jid.bare: item}
439 async def add_to_roster(self, force=False):
440 """
441 Add this contact to the user roster using :xep:`0356`
443 :param force: add even if the contact was already added successfully
444 """
445 if self.added_to_roster and not force:
446 return
447 if config.NO_ROSTER_PUSH:
448 log.debug("Roster push request by plugin ignored (--no-roster-push)")
449 return
450 try:
451 await self._set_roster(
452 jid=self.user_jid, roster_items=self.get_roster_item()
453 )
454 except PermissionError:
455 warnings.warn(
456 "Slidge does not have privileges to add contacts to the roster. Refer"
457 " to https://slidge.im/core/admin/privilege.html for"
458 " more info."
459 )
460 if config.ROSTER_PUSH_PRESENCE_SUBSCRIPTION_REQUEST_FALLBACK:
461 self.send_friend_request(
462 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
463 "slidge is not allowed to manage your roster."
464 )
465 return
466 except (IqError, IqTimeout) as e:
467 self.log.warning("Could not add to roster", exc_info=e)
468 else:
469 # we only broadcast pubsub events for contacts added to the roster
470 # so if something was set before, we need to push it now
471 self.added_to_roster = True
472 self.send_last_presence()
474 async def __broadcast_pubsub_items(self):
475 if not self.is_friend:
476 return
477 if not self.added_to_roster:
478 return
479 cached_avatar = self.get_cached_avatar()
480 if cached_avatar is not None:
481 await self.xmpp.pubsub.broadcast_avatar(
482 self.jid.bare, self.session.user_jid, cached_avatar
483 )
484 nick = self.name
486 if nick is not None:
487 self.xmpp.pubsub.broadcast_nick(
488 self.session.user_jid,
489 self.jid.bare,
490 nick,
491 )
493 async def _set_roster(self, **kw):
494 try:
495 await self.xmpp["xep_0356"].set_roster(**kw)
496 except PermissionError:
497 await self.xmpp["xep_0356_old"].set_roster(**kw)
499 def send_friend_request(self, text: Optional[str] = None):
500 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
501 self._send(presence, nick=True)
503 async def accept_friend_request(self, text: Optional[str] = None):
504 """
505 Call this to signify that this Contact has accepted to be a friend
506 of the user.
508 :param text: Optional message from the friend to the user
509 """
510 self.is_friend = True
511 self.added_to_roster = True
512 self.__ensure_pk()
513 self.log.debug("Accepting friend request")
514 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
515 self._send(presence, nick=True)
516 self.send_last_presence()
517 await self.__broadcast_pubsub_items()
518 self.log.debug("Accepted friend request")
520 def reject_friend_request(self, text: Optional[str] = None):
521 """
522 Call this to signify that this Contact has refused to be a contact
523 of the user (or that they don't want to be friends anymore)
525 :param text: Optional message from the non-friend to the user
526 """
527 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
528 self.offline()
529 self._send(presence, nick=True)
530 self.is_friend = False
532 async def on_friend_request(self, text=""):
533 """
534 Called when receiving a "subscribe" presence, ie, "I would like to add
535 you to my contacts/friends", from the user to this contact.
537 In XMPP terms: "I would like to receive your presence updates"
539 This is only called if self.is_friend = False. If self.is_friend = True,
540 slidge will automatically "accept the friend request", ie, reply with
541 a "subscribed" presence.
543 When called, a 'friend request event' should be sent to the legacy
544 service, and when the contact responds, you should either call
545 self.accept_subscription() or self.reject_subscription()
546 """
547 pass
549 async def on_friend_delete(self, text=""):
550 """
551 Called when receiving an "unsubscribed" presence, ie, "I would like to
552 remove you to my contacts/friends" or "I refuse your friend request"
553 from the user to this contact.
555 In XMPP terms: "You won't receive my presence updates anymore (or you
556 never have)".
557 """
558 pass
560 async def on_friend_accept(self):
561 """
562 Called when receiving a "subscribed" presence, ie, "I accept to be
563 your/confirm that you are my friend" from the user to this contact.
565 In XMPP terms: "You will receive my presence updates".
566 """
567 pass
569 def unsubscribe(self):
570 """
571 (internal use by slidge)
573 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
574 from this contact to the user, ie, "this contact has removed you from
575 their 'friends'".
576 """
577 for ptype in "unsubscribe", "unsubscribed", "unavailable":
578 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype) # type: ignore
580 async def update_info(self):
581 """
582 Fetch information about this contact from the legacy network
584 This is awaited on Contact instantiation, and should be overridden to
585 update the nickname, avatar, vcard [...] of this contact, by making
586 "legacy API calls".
588 To take advantage of the slidge avatar cache, you can check the .avatar
589 property to retrieve the "legacy file ID" of the cached avatar. If there
590 is no change, you should not call
591 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
592 to modify the ``.avatar`` property.
593 """
594 pass
596 async def fetch_vcard(self):
597 """
598 It the legacy network doesn't like that you fetch too many profiles on startup,
599 it's also possible to fetch it here, which will be called when XMPP clients
600 of the user request the vcard, if it hasn't been fetched before
601 :return:
602 """
603 pass
605 def _make_presence(
606 self,
607 *,
608 last_seen: Optional[datetime.datetime] = None,
609 status_codes: Optional[set[int]] = None,
610 user_full_jid: Optional[JID] = None,
611 **presence_kwargs,
612 ):
613 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
614 caps = self.xmpp.plugin["xep_0115"]
615 if p.get_from().resource and self._caps_ver:
616 p["caps"]["node"] = caps.caps_node
617 p["caps"]["hash"] = caps.hash
618 p["caps"]["ver"] = self._caps_ver
619 return p
621 @classmethod
622 def from_store(cls, session, stored: Contact, *args, **kwargs) -> Self:
623 contact = cls(
624 session,
625 cls.xmpp.LEGACY_CONTACT_ID_TYPE(stored.legacy_id),
626 stored.jid.username, # type: ignore
627 *args, # type: ignore
628 **kwargs, # type: ignore
629 )
630 contact.contact_pk = stored.id
631 contact._name = stored.nick
632 contact._is_friend = stored.is_friend
633 contact._added_to_roster = stored.added_to_roster
634 if (data := stored.extra_attributes) is not None:
635 contact.deserialize_extra_attributes(data)
636 contact._caps_ver = stored.caps_ver
637 contact._set_logger_name()
638 contact._AvatarMixin__avatar_unique_id = ( # type:ignore
639 None
640 if stored.avatar_legacy_id is None
641 else session.xmpp.AVATAR_ID_TYPE(stored.avatar_legacy_id)
642 )
643 contact._avatar_pk = stored.avatar_id
644 contact._vcard = stored.vcard
645 contact._vcard_fetched = stored.vcard_fetched
646 contact._client_type = stored.client_type
647 return contact
650def is_markable(stanza: Union[Message, Presence]):
651 if isinstance(stanza, Presence):
652 return False
653 return bool(stanza["body"])
656log = logging.getLogger(__name__)