Source code for

import logging
import warnings
from datetime import date
from typing import TYPE_CHECKING, Generic, Iterable, Optional, Union

from slixmpp import JID, Message, Presence
from slixmpp.exceptions import IqError

from ...util import SubclassableOnce
from ...util.types import AvatarType, LegacyUserIdType
from ...util.xep_0292.stanza import VCard4
from .. import config
from ..mixins import FullCarbonMixin
from ..mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin

    from ..muc import LegacyParticipant
    from ..session import BaseSession

[docs]class LegacyContact( Generic[LegacyUserIdType], FullCarbonMixin, ReactionRecipientMixin, ThreadRecipientMixin, metaclass=SubclassableOnce, ): """ This class centralizes actions in relation to a specific legacy contact. You shouldn't create instances of contacts manually, but rather rely on :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are singletons. The :class:`.LegacyRoster` instance of a session is accessible through the :attr:`.BaseSession.contacts` attribute. Typically, your plugin should have methods hook to the legacy events and call appropriate methods here to transmit the "legacy action" to the xmpp user. This should look like this: .. code-block:python class Session(BaseSession): ... async def on_cool_chat_network_new_text_message(self, legacy_msg_event): contact = self.contacts.by_legacy_id(legacy_msg_event.from) contact.send_text(legacy_msg_event.text) async def on_cool_chat_network_new_typing_event(self, legacy_typing_event): contact = self.contacts.by_legacy_id(legacy_msg_event.from) contact.composing() ... Use ``carbon=True`` as a keyword arg for methods to represent an action FROM the user TO the contact, typically when the user uses an official client to do an action such as sending a message or marking as message as read. This will use :xep:`0363` to impersonate the XMPP user in order. """
[docs] session: "BaseSession"
[docs] RESOURCE: str = "slidge"
""" A full JID, including a resource part is required for chat states (and maybe other stuff) to work properly. This is the name of the resource the contacts will use. """
[docs] mtype = "chat"
[docs] _can_send_carbon = True
[docs] is_group = False
def __init__( self, session: "BaseSession", legacy_id: LegacyUserIdType, jid_username: str, ): """ :param session: The session this contact is part of :param legacy_id: The contact's legacy ID :param jid_username: User part of this contact's 'puppet' JID. NB: case-insensitive, and some special characters are not allowed """ super().__init__() self.session = session self.user = session.user self.legacy_id = legacy_id self.jid_username = jid_username self._name: Optional[str] = None self._avatar: Optional[Union[AvatarType, bool]] = None if self.xmpp.MARK_ALL_MESSAGES: self._sent_order = list[str]() self.xmpp = session.xmpp self.jid = JID(self.jid_username + "@" + self.xmpp.boundjid.bare) self.jid.resource = self.RESOURCE self.log = logging.getLogger(f"{self.user.bare_jid}:{self.jid.bare}") self.participants = set["LegacyParticipant"]() self.is_friend = False self.__added_to_roster = False
[docs] def __repr__(self): return f"<Contact '{self.legacy_id}'/'{self.jid.bare}'>"
[docs] def __get_subscription_string(self): if self.is_friend: return "both" return "none"
[docs] def _send( self, stanza: Union[Message, Presence], carbon=False, nick=False, **send_kwargs ): if carbon and isinstance(stanza, Message): stanza["to"] = self.jid.bare stanza["from"] = self.user.jid self._privileged_send(stanza) else: if ( isinstance(stanza, Presence) and not self.is_friend and stanza["type"] not in ("subscribe", "unsubscribed") ): return if and (nick or not self.is_friend): n = self.xmpp.plugin["xep_0172"].stanza.UserNick() n["nick"] = stanza.append(n) if self.xmpp.MARK_ALL_MESSAGES and is_markable(stanza): self._sent_order.append(stanza["id"]) stanza["to"] = self.user.jid stanza.send()
[docs] def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str): """ Return XMPP msg ids sent by this contact up to a given XMPP msg id. Plugins have no reason to use this, but it is used by slidge core for legacy networks that need to mark all messages as read (most XMPP clients only send a read marker for the latest message). This has side effects, if the horizon XMPP id is found, messages up to this horizon are not cleared, to avoid sending the same read mark twice. :param horizon_xmpp_id: The latest message :return: A list of XMPP ids or None if horizon_xmpp_id was not found """ for i, xmpp_id in enumerate(self._sent_order): if xmpp_id == horizon_xmpp_id: break else: return i += 1 res = self._sent_order[:i] self._sent_order = self._sent_order[i:] return res
[docs] def name(self): """ Friendly name of the contact, as it should appear in the user's roster """ return self._name
@name.setter def name(self, n: Optional[str]): if self._name == n: return for p in self.participants: p.nickname = n self._name = n self.xmpp.pubsub.set_nick( jid=self.jid.bare, nick=n, restrict_to=self.user.jid.bare ) @property
[docs] def avatar(self): """ An image that represents this contact """ return self._avatar
@avatar.setter def avatar(self, a: Optional[AvatarType]): """ Set the avatar. self.set_avatar() should be preferred because you can provide a unique ID for the avatar, to help caching. """ self.xmpp.loop.create_task(self.set_avatar(a))
[docs] async def set_avatar( self, a: Optional[AvatarType], avatar_unique_id: Optional[Union[int, str]] = None, blocking=False, ): """ Set the avatar for this contact :param a: Any avatar format supported by slidge :param avatar_unique_id: If possible, provide a unique ID to cache the avatar. If it is not provided, the SHA-1 of the avatar will be used, unless it is an HTTP url. In this case, the url will be used, along with etag or last modified HTTP headers, to avoid fetching uselessly. Beware of legacy plugin where URLs are not stable. :param blocking: if True, will await setting the avatar, if False, launch in a task :return: """ awaitable = self.xmpp.pubsub.set_avatar( jid=self.jid.bare, avatar=a, unique_id=avatar_unique_id, restrict_to=self.user.jid.bare, ) if blocking: await awaitable else: self.xmpp.loop.create_task(awaitable) # if it's bytes, we don't want to cache it in RAM, so just a bool to know it has been set self._avatar = isinstance(a, bytes) or a
[docs] def get_avatar(self): if not self._avatar: return return self.xmpp.pubsub.get_avatar(jid=self.jid.bare)
[docs] def set_vcard( self, /, full_name: Optional[str] = None, given: Optional[str] = None, surname: Optional[str] = None, birthday: Optional[date] = None, phone: Optional[str] = None, phones: Iterable[str] = (), note: Optional[str] = None, url: Optional[str] = None, email: Optional[str] = None, country: Optional[str] = None, locality: Optional[str] = None, ): vcard = VCard4() vcard.add_impp(f"xmpp:{self.jid.bare}") if n := vcard.add_nickname(n) if full_name: vcard["full_name"] = full_name elif n: vcard["full_name"] = n if given: vcard["given"] = given if surname: vcard["surname"] = surname if birthday: vcard["birthday"] = birthday if note: vcard.add_note(note) if url: vcard.add_url(url) if email: vcard.add_email(email) if phone: vcard.add_tel(phone) for p in phones: vcard.add_tel(p) if country and locality: vcard.add_address(country, locality) elif country: vcard.add_address(country, locality) self.xmpp.vcard.set_vcard(self.jid.bare, vcard, {self.user.jid.bare})
[docs] async def add_to_roster(self, force=False): """ Add this contact to the user roster using :xep:`0356` :param force: add even if the contact was already added successfully """ if self.__added_to_roster and not force: return if config.NO_ROSTER_PUSH: log.debug("Roster push request by plugin ignored (--no-roster-push)") return item = { "subscription": self.__get_subscription_string(), "groups": [self.xmpp.ROSTER_GROUP], } if (n := is not None: item["name"] = n kw = dict( jid=self.user.jid, roster_items={self.jid.bare: item}, ) try: await self._set_roster(**kw) except PermissionError: warnings.warn( "Slidge does not have privileges to add contacts to the roster. Refer" " to for" " more info." ) if config.ROSTER_PUSH_PRESENCE_SUBSCRIPTION_REQUEST_FALLBACK: self.send_friend_request( f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but " "slidge is not allowed to manage your roster." ) return except IqError as e: self.log.warning("Could not add to roster", exc_info=e) else: # we only broadcast pubsub events for contacts added to the roster # so if something was set before, we need to push it now self.__added_to_roster = True self.xmpp.loop.create_task(self.__broadcast_pubsub_items()) self.send_last_presence()
[docs] async def __broadcast_pubsub_items(self): await self.xmpp.pubsub.broadcast_all(JID(self.jid.bare), self.user.jid)
[docs] async def _set_roster(self, **kw): try: return await self.xmpp["xep_0356"].set_roster(**kw) except PermissionError: return await self.xmpp["xep_0356_old"].set_roster(**kw)
[docs] def send_friend_request(self, text: Optional[str] = None): presence = self._make_presence(ptype="subscribe", pstatus=text) self._send(presence, nick=True)
[docs] async def accept_friend_request(self, text: Optional[str] = None): """ Call this to signify that this Contact has accepted to be a friend of the user. :param text: Optional message from the friend to the user """ self.is_friend = True self.log.debug("Accepting friend request") presence = self._make_presence(ptype="subscribed", pstatus=text) self._send(presence, nick=True) self.send_last_presence() await self.__broadcast_pubsub_items() self.log.debug("Accepted friend request")
[docs] def reject_friend_request(self, text: Optional[str] = None): """ Call this to signify that this Contact has refused to be a contact of the user (or that they don't want to be friends anymore) :param text: Optional message from the non-friend to the user """ presence = self._make_presence(ptype="unsubscribed", pstatus=text) self.offline() self._send(presence, nick=True) self.is_friend = False
[docs] async def on_friend_request(self, text=""): """ Called when receiving a "subscribe" presence, ie, "I would like to add you to my contacts/friends", from the user to this contact. In XMPP terms: "I would like to receive your presence updates" This is only called if self.is_friend = False. If self.is_friend = True, slidge will automatically "accept the friend request", ie, reply with a "subscribed" presence. When called, a 'friend request event' should be sent to the legacy service, and when the contact responds, you should either call self.accept_subscription() or self.reject_subscription() """ pass
[docs] async def on_friend_delete(self, text=""): """ Called when receiving an "unsubscribed" presence, ie, "I would like to remove you to my contacts/friends" or "I refuse your friend request" from the user to this contact. In XMPP terms: "You won't receive my presence updates anymore (or you never have)". """ pass
[docs] async def on_friend_accept(self): """ Called when receiving a "subscribed" presence, ie, "I accept to be your/confirm that you are my friend" from the user to this contact. In XMPP terms: "You will receive my presence updates". """ pass
[docs] def unsubscribe(self): """ (internal use by slidge) Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence from this contact to the user, ie, "this contact has removed you from their 'friends'". """ for ptype in "unsubscribe", "unsubscribed", "unavailable": self.xmpp.send_presence(pfrom=self.jid, pto=self.user.jid.bare, ptype=ptype) # type: ignore
[docs] async def update_info(self): """ Fetch information about this contact from the legacy network This is awaited on Contact instantiation, and should be overridden to update the nickname, avatar, vcard [..] of this contact, by making "legacy API calls". """ pass
[docs] async def fetch_vcard(self): """ It the legacy network doesn't like that you fetch too many profiles on startup, it's also possible to fetch it here, which will be called when XMPP clients of the user request the vcard, if it hasn't been fetched before :return: """ pass
[docs]def is_markable(stanza: Union[Message, Presence]): if isinstance(stanza, Presence): return False return bool(stanza["body"])
[docs]log = logging.getLogger(__name__)