Source code for slidge.core.contact.contact
import logging
import warnings
from datetime import date
from typing import TYPE_CHECKING, Generic, Iterable, Optional, Union
from slixmpp import JID, Message, Presence
from slixmpp.exceptions import IqError
from ...util import SubclassableOnce
from ...util.types import AvatarType, LegacyUserIdType
from ...util.xep_0292.stanza import VCard4
from .. import config
from ..mixins import FullCarbonMixin
from ..mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
if TYPE_CHECKING:
from ..muc import LegacyParticipant
from ..session import BaseSession
[docs]class LegacyContact(
Generic[LegacyUserIdType],
FullCarbonMixin,
ReactionRecipientMixin,
ThreadRecipientMixin,
metaclass=SubclassableOnce,
):
"""
This class centralizes actions in relation to a specific legacy contact.
You shouldn't create instances of contacts manually, but rather rely on
:meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
singletons. The :class:`.LegacyRoster` instance of a session is accessible
through the :attr:`.BaseSession.contacts` attribute.
Typically, your plugin should have methods hook to the legacy events and
call appropriate methods here to transmit the "legacy action" to the xmpp
user. This should look like this:
.. code-block:python
class Session(BaseSession):
...
async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
contact = self.contacts.by_legacy_id(legacy_msg_event.from)
contact.send_text(legacy_msg_event.text)
async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
contact = self.contacts.by_legacy_id(legacy_msg_event.from)
contact.composing()
...
Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
the user TO the contact, typically when the user uses an official client to
do an action such as sending a message or marking as message as read.
This will use :xep:`0363` to impersonate the XMPP user in order.
"""
"""
A full JID, including a resource part is required for chat states (and maybe other stuff)
to work properly. This is the name of the resource the contacts will use.
"""
def __init__(
self,
session: "BaseSession",
legacy_id: LegacyUserIdType,
jid_username: str,
):
"""
:param session: The session this contact is part of
:param legacy_id: The contact's legacy ID
:param jid_username: User part of this contact's 'puppet' JID.
NB: case-insensitive, and some special characters are not allowed
"""
super().__init__()
self.session = session
self.user = session.user
self.legacy_id = legacy_id
self.jid_username = jid_username
self._name: Optional[str] = None
self._avatar: Optional[Union[AvatarType, bool]] = None
if self.xmpp.MARK_ALL_MESSAGES:
self._sent_order = list[str]()
self.xmpp = session.xmpp
self.jid = JID(self.jid_username + "@" + self.xmpp.boundjid.bare)
self.jid.resource = self.RESOURCE
self.log = logging.getLogger(f"{self.user.bare_jid}:{self.jid.bare}")
self.participants = set["LegacyParticipant"]()
self.is_friend = False
self.__added_to_roster = False
[docs] def _send(
self, stanza: Union[Message, Presence], carbon=False, nick=False, **send_kwargs
):
if carbon and isinstance(stanza, Message):
stanza["to"] = self.jid.bare
stanza["from"] = self.user.jid
self._privileged_send(stanza)
else:
if (
isinstance(stanza, Presence)
and not self.is_friend
and stanza["type"] not in ("subscribe", "unsubscribed")
):
return
if self.name and (nick or not self.is_friend):
n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
n["nick"] = self.name
stanza.append(n)
if self.xmpp.MARK_ALL_MESSAGES and is_markable(stanza):
self._sent_order.append(stanza["id"])
stanza["to"] = self.user.jid
stanza.send()
[docs] def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str):
"""
Return XMPP msg ids sent by this contact up to a given XMPP msg id.
Plugins have no reason to use this, but it is used by slidge core
for legacy networks that need to mark all messages as read (most XMPP
clients only send a read marker for the latest message).
This has side effects, if the horizon XMPP id is found, messages up to
this horizon are not cleared, to avoid sending the same read mark twice.
:param horizon_xmpp_id: The latest message
:return: A list of XMPP ids or None if horizon_xmpp_id was not found
"""
for i, xmpp_id in enumerate(self._sent_order):
if xmpp_id == horizon_xmpp_id:
break
else:
return
i += 1
res = self._sent_order[:i]
self._sent_order = self._sent_order[i:]
return res
@property
[docs] def name(self):
"""
Friendly name of the contact, as it should appear in the user's roster
"""
return self._name
@name.setter
def name(self, n: Optional[str]):
if self._name == n:
return
for p in self.participants:
p.nickname = n
self._name = n
self.xmpp.pubsub.set_nick(
jid=self.jid.bare, nick=n, restrict_to=self.user.jid.bare
)
@property
@avatar.setter
def avatar(self, a: Optional[AvatarType]):
"""
Set the avatar. self.set_avatar() should be preferred because you can provide
a unique ID for the avatar, to help caching.
"""
self.xmpp.loop.create_task(self.set_avatar(a))
[docs] async def set_avatar(
self,
a: Optional[AvatarType],
avatar_unique_id: Optional[Union[int, str]] = None,
blocking=False,
):
"""
Set the avatar for this contact
:param a: Any avatar format supported by slidge
:param avatar_unique_id: If possible, provide a unique ID to cache the avatar.
If it is not provided, the SHA-1 of the avatar will be used,
unless it is an HTTP url. In this case, the url will be used,
along with etag or last modified HTTP headers, to avoid fetching
uselessly. Beware of legacy plugin where URLs are not stable.
:param blocking: if True, will await setting the avatar, if False, launch in a task
:return:
"""
awaitable = self.xmpp.pubsub.set_avatar(
jid=self.jid.bare,
avatar=a,
unique_id=avatar_unique_id,
restrict_to=self.user.jid.bare,
)
if blocking:
await awaitable
else:
self.xmpp.loop.create_task(awaitable)
# if it's bytes, we don't want to cache it in RAM, so just a bool to know it has been set
self._avatar = isinstance(a, bytes) or a
[docs] def get_avatar(self):
if not self._avatar:
return
return self.xmpp.pubsub.get_avatar(jid=self.jid.bare)
[docs] def set_vcard(
self,
/,
full_name: Optional[str] = None,
given: Optional[str] = None,
surname: Optional[str] = None,
birthday: Optional[date] = None,
phone: Optional[str] = None,
phones: Iterable[str] = (),
note: Optional[str] = None,
url: Optional[str] = None,
email: Optional[str] = None,
country: Optional[str] = None,
locality: Optional[str] = None,
):
vcard = VCard4()
vcard.add_impp(f"xmpp:{self.jid.bare}")
if n := self.name:
vcard.add_nickname(n)
if full_name:
vcard["full_name"] = full_name
elif n:
vcard["full_name"] = n
if given:
vcard["given"] = given
if surname:
vcard["surname"] = surname
if birthday:
vcard["birthday"] = birthday
if note:
vcard.add_note(note)
if url:
vcard.add_url(url)
if email:
vcard.add_email(email)
if phone:
vcard.add_tel(phone)
for p in phones:
vcard.add_tel(p)
if country and locality:
vcard.add_address(country, locality)
elif country:
vcard.add_address(country, locality)
self.xmpp.vcard.set_vcard(self.jid.bare, vcard, {self.user.jid.bare})
[docs] async def add_to_roster(self, force=False):
"""
Add this contact to the user roster using :xep:`0356`
:param force: add even if the contact was already added successfully
"""
if self.__added_to_roster and not force:
return
if config.NO_ROSTER_PUSH:
log.debug("Roster push request by plugin ignored (--no-roster-push)")
return
item = {
"subscription": self.__get_subscription_string(),
"groups": [self.xmpp.ROSTER_GROUP],
}
if (n := self.name) is not None:
item["name"] = n
kw = dict(
jid=self.user.jid,
roster_items={self.jid.bare: item},
)
try:
await self._set_roster(**kw)
except PermissionError:
warnings.warn(
"Slidge does not have privileges to add contacts to the roster. Refer"
" to https://slidge.readthedocs.io/en/latest/admin/xmpp_server.html for"
" more info."
)
if config.ROSTER_PUSH_PRESENCE_SUBSCRIPTION_REQUEST_FALLBACK:
self.send_friend_request(
f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
"slidge is not allowed to manage your roster."
)
return
except IqError as e:
self.log.warning("Could not add to roster", exc_info=e)
else:
# we only broadcast pubsub events for contacts added to the roster
# so if something was set before, we need to push it now
self.__added_to_roster = True
self.xmpp.loop.create_task(self.__broadcast_pubsub_items())
self.send_last_presence()
[docs] async def __broadcast_pubsub_items(self):
await self.xmpp.pubsub.broadcast_all(JID(self.jid.bare), self.user.jid)
[docs] async def _set_roster(self, **kw):
try:
return await self.xmpp["xep_0356"].set_roster(**kw)
except PermissionError:
return await self.xmpp["xep_0356_old"].set_roster(**kw)
[docs] def send_friend_request(self, text: Optional[str] = None):
presence = self._make_presence(ptype="subscribe", pstatus=text)
self._send(presence, nick=True)
[docs] async def accept_friend_request(self, text: Optional[str] = None):
"""
Call this to signify that this Contact has accepted to be a friend
of the user.
:param text: Optional message from the friend to the user
"""
self.is_friend = True
self.log.debug("Accepting friend request")
presence = self._make_presence(ptype="subscribed", pstatus=text)
self._send(presence, nick=True)
self.send_last_presence()
await self.__broadcast_pubsub_items()
self.log.debug("Accepted friend request")
[docs] def reject_friend_request(self, text: Optional[str] = None):
"""
Call this to signify that this Contact has refused to be a contact
of the user (or that they don't want to be friends anymore)
:param text: Optional message from the non-friend to the user
"""
presence = self._make_presence(ptype="unsubscribed", pstatus=text)
self.offline()
self._send(presence, nick=True)
self.is_friend = False
[docs] async def on_friend_request(self, text=""):
"""
Called when receiving a "subscribe" presence, ie, "I would like to add
you to my contacts/friends", from the user to this contact.
In XMPP terms: "I would like to receive your presence updates"
This is only called if self.is_friend = False. If self.is_friend = True,
slidge will automatically "accept the friend request", ie, reply with
a "subscribed" presence.
When called, a 'friend request event' should be sent to the legacy
service, and when the contact responds, you should either call
self.accept_subscription() or self.reject_subscription()
"""
pass
[docs] async def on_friend_delete(self, text=""):
"""
Called when receiving an "unsubscribed" presence, ie, "I would like to
remove you to my contacts/friends" or "I refuse your friend request"
from the user to this contact.
In XMPP terms: "You won't receive my presence updates anymore (or you
never have)".
"""
pass
[docs] async def on_friend_accept(self):
"""
Called when receiving a "subscribed" presence, ie, "I accept to be
your/confirm that you are my friend" from the user to this contact.
In XMPP terms: "You will receive my presence updates".
"""
pass
[docs] def unsubscribe(self):
"""
(internal use by slidge)
Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
from this contact to the user, ie, "this contact has removed you from
their 'friends'".
"""
for ptype in "unsubscribe", "unsubscribed", "unavailable":
self.xmpp.send_presence(pfrom=self.jid, pto=self.user.jid.bare, ptype=ptype) # type: ignore
[docs] async def update_info(self):
"""
Fetch information about this contact from the legacy network
This is awaited on Contact instantiation, and should be overridden to
update the nickname, avatar, vcard [..] of this contact, by making
"legacy API calls".
"""
pass
[docs] async def fetch_vcard(self):
"""
It the legacy network doesn't like that you fetch too many profiles on startup,
it's also possible to fetch it here, which will be called when XMPP clients
of the user request the vcard, if it hasn't been fetched before
:return:
"""
pass
[docs]def is_markable(stanza: Union[Message, Presence]):
if isinstance(stanza, Presence):
return False
return bool(stanza["body"])