Coverage for slidge / contact / contact.py: 86%
282 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
1import datetime
2import logging
3import warnings
4from collections.abc import Iterable, Iterator
5from datetime import date
6from typing import TYPE_CHECKING, Generic
7from xml.etree import ElementTree as ET
9from slixmpp import JID, Message, Presence
10from slixmpp.exceptions import IqError, IqTimeout
11from slixmpp.plugins.xep_0292.stanza import VCard4
12from slixmpp.types import MessageTypes
14from ..core.mixins import AvatarMixin, FullCarbonMixin
15from ..core.mixins.disco import ContactAccountDiscoMixin
16from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
17from ..db.models import Contact, ContactSent
18from ..util import SubclassableOnce
19from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar
21if TYPE_CHECKING:
22 from ..core.session import BaseSession
23 from ..group.participant import LegacyParticipant
26class LegacyContact(
27 Generic[LegacyUserIdType],
28 AvatarMixin,
29 ContactAccountDiscoMixin,
30 FullCarbonMixin,
31 ReactionRecipientMixin,
32 ThreadRecipientMixin,
33 metaclass=SubclassableOnce,
34):
35 """
36 This class centralizes actions in relation to a specific legacy contact.
38 You shouldn't create instances of contacts manually, but rather rely on
39 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
40 singletons. The :class:`.LegacyRoster` instance of a session is accessible
41 through the :attr:`.BaseSession.contacts` attribute.
43 Typically, your plugin should have methods hook to the legacy events and
44 call appropriate methods here to transmit the "legacy action" to the xmpp
45 user. This should look like this:
47 .. code-block:python
49 class Session(BaseSession):
50 ...
52 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
53 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
54 contact.send_text(legacy_msg_event.text)
56 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
57 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
58 contact.composing()
59 ...
61 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
62 the user TO the contact, typically when the user uses an official client to
63 do an action such as sending a message or marking as message as read.
64 This will use :xep:`0363` to impersonate the XMPP user in order.
65 """
67 session: "BaseSession"
69 RESOURCE: str = "slidge"
70 """
71 A full JID, including a resource part is required for chat states (and maybe other stuff)
72 to work properly. This is the name of the resource the contacts will use.
73 """
74 PROPAGATE_PRESENCE_TO_GROUPS = True
76 mtype: MessageTypes = "chat"
77 _can_send_carbon = True
78 is_participant = False
79 is_group = False
81 _ONLY_SEND_PRESENCE_CHANGES = True
83 STRIP_SHORT_DELAY = True
84 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"}
86 INVITATION_RECIPIENT = True
88 stored: Contact
89 model: Contact
91 def __init__(self, session: "BaseSession", stored: Contact) -> None:
92 self.session = session
93 self.xmpp = session.xmpp
94 self.stored = stored
95 self._set_logger()
96 super().__init__()
98 @property
99 def jid(self): # type:ignore[override]
100 jid = JID(self.stored.jid)
101 jid.resource = self.RESOURCE
102 return jid
104 @property
105 def legacy_id(self):
106 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id)
108 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
109 if fetch and not self.stored.vcard_fetched:
110 await self.fetch_vcard()
111 if self.stored.vcard is None:
112 return None
114 return VCard4(xml=ET.fromstring(self.stored.vcard))
116 @property
117 def is_friend(self) -> bool:
118 return self.stored.is_friend
120 @is_friend.setter
121 def is_friend(self, value: bool) -> None:
122 if value == self.is_friend:
123 return
124 self.update_stored_attribute(is_friend=value)
126 @property
127 def added_to_roster(self) -> bool:
128 return self.stored.added_to_roster
130 @added_to_roster.setter
131 def added_to_roster(self, value: bool) -> None:
132 if value == self.added_to_roster:
133 return
134 self.update_stored_attribute(added_to_roster=value)
136 @property
137 def participants(self) -> Iterator["LegacyParticipant"]:
138 with self.xmpp.store.session() as orm:
139 self.stored = orm.merge(self.stored)
140 participants = self.stored.participants
141 for p in participants:
142 with self.xmpp.store.session() as orm:
143 p = orm.merge(p)
144 muc = self.session.bookmarks.from_store(p.room)
145 yield muc.participant_from_store(p, contact=self)
147 @property
148 def user_jid(self):
149 return self.session.user_jid
151 @property # type:ignore
152 def DISCO_TYPE(self) -> ClientType:
153 return self.client_type
155 @DISCO_TYPE.setter
156 def DISCO_TYPE(self, value: ClientType) -> None:
157 self.client_type = value
159 @property
160 def client_type(self) -> ClientType:
161 """
162 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
164 Default is "pc".
165 """
166 return self.stored.client_type
168 @client_type.setter
169 def client_type(self, value: ClientType) -> None:
170 if self.stored.client_type == value:
171 return
172 self.update_stored_attribute(client_type=value)
174 def _set_logger(self) -> None:
175 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
177 def __repr__(self) -> str:
178 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
180 def __get_subscription_string(self) -> str:
181 if self.is_friend:
182 return "both"
183 return "none"
185 def __propagate_to_participants(self, stanza: Presence) -> None:
186 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
187 return
189 ptype = stanza["type"]
190 if ptype in ("available", "chat"):
191 func_name = "online"
192 elif ptype in ("xa", "unavailable"):
193 # we map unavailable to extended_away, because offline is
194 # "participant leaves the MUC"
195 # TODO: improve this with a clear distinction between participant
196 # and member list
197 func_name = "extended_away"
198 elif ptype == "busy":
199 func_name = "busy"
200 elif ptype == "away":
201 func_name = "away"
202 else:
203 return
205 last_seen: datetime.datetime | None = (
206 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None
207 )
209 kw = dict(status=stanza["status"], last_seen=last_seen)
211 for part in self.participants:
212 func = getattr(part, func_name)
213 func(**kw)
215 def _send(
216 self,
217 stanza: MessageOrPresenceTypeVar,
218 carbon: bool = False,
219 nick: bool = False,
220 **send_kwargs,
221 ) -> MessageOrPresenceTypeVar:
222 if carbon and isinstance(stanza, Message):
223 stanza["to"] = self.jid.bare
224 stanza["from"] = self.user_jid
225 self._privileged_send(stanza)
226 return stanza # type:ignore
228 if isinstance(stanza, Presence):
229 if not self._updating_info:
230 self.__propagate_to_participants(stanza)
231 if (
232 not self.is_friend
233 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
234 ):
235 return stanza # type:ignore
236 if self.name and (nick or not self.is_friend):
237 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
238 n["nick"] = self.name
239 stanza.append(n)
240 if (
241 not self._updating_info
242 and self.xmpp.MARK_ALL_MESSAGES
243 and is_markable(stanza)
244 ):
245 with self.xmpp.store.session(expire_on_commit=False) as orm:
246 self.stored = orm.merge(self.stored)
247 exists = (
248 orm.query(ContactSent)
249 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"])
250 .first()
251 )
252 if exists:
253 self.log.warning(
254 "Contact has already sent message %s", stanza["id"]
255 )
256 else:
257 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
258 orm.add(new)
259 self.stored.sent_order.append(new)
260 orm.commit()
261 stanza["to"] = self.user_jid
262 stanza.send()
263 return stanza
265 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]:
266 """
267 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
269 Legacy modules have no reason to use this, but it is used by slidge core
270 for legacy networks that need to mark all messages as read (most XMPP
271 clients only send a read marker for the latest message).
273 This has side effects, if the horizon XMPP id is found, messages up to
274 this horizon are cleared, to avoid sending the same read mark twice.
276 :param horizon_xmpp_id: The latest message
277 :return: A list of XMPP ids up to horizon_xmpp_id, included
278 """
279 with self.xmpp.store.session() as orm:
280 assert self.stored.id is not None
281 ids = self.xmpp.store.contacts.pop_sent_up_to(
282 orm, self.stored.id, horizon_xmpp_id
283 )
284 orm.commit()
285 return ids
287 @property
288 def name(self) -> str:
289 """
290 Friendly name of the contact, as it should appear in the user's roster
291 """
292 return self.stored.nick or ""
294 @name.setter
295 def name(self, n: str | None) -> None:
296 if self.stored.nick == n:
297 return
298 self.update_stored_attribute(nick=n)
299 self._set_logger()
300 if self.is_friend and self.added_to_roster:
301 self.xmpp.pubsub.broadcast_nick(
302 user_jid=self.user_jid, jid=self.jid.bare, nick=n
303 )
304 for p in self.participants:
305 p.nickname = n or str(self.legacy_id)
307 def _post_avatar_update(self, cached_avatar) -> None:
308 if self.is_friend and self.added_to_roster:
309 self.session.create_task(
310 self.session.xmpp.pubsub.broadcast_avatar(
311 self.jid.bare, self.session.user_jid, cached_avatar
312 )
313 )
314 for p in self.participants:
315 self.log.debug("Propagating new avatar to %s", p.muc)
316 p.send_last_presence(force=True, no_cache_online=True)
318 def set_vcard(
319 self,
320 /,
321 full_name: str | None = None,
322 given: str | None = None,
323 surname: str | None = None,
324 birthday: date | None = None,
325 phone: str | None = None,
326 phones: Iterable[str] = (),
327 note: str | None = None,
328 url: str | None = None,
329 email: str | None = None,
330 country: str | None = None,
331 locality: str | None = None,
332 pronouns: str | None = None,
333 ) -> None:
334 """
335 Update xep:`0292` data for this contact.
337 Use this for additional metadata about this contact to be available to XMPP
338 clients. The "note" argument is a text of arbitrary size and can be useful when
339 no other field is a good fit.
340 """
341 vcard = VCard4()
342 vcard.add_impp(f"xmpp:{self.jid.bare}")
344 if n := self.name:
345 vcard.add_nickname(n)
346 if full_name:
347 vcard["full_name"] = full_name
348 elif n:
349 vcard["full_name"] = n
351 if given:
352 vcard["given"] = given
353 if surname:
354 vcard["surname"] = surname
355 if birthday:
356 vcard["birthday"] = birthday
358 if note:
359 vcard.add_note(note)
360 if url:
361 vcard.add_url(url)
362 if email:
363 vcard.add_email(email)
364 if phone:
365 vcard.add_tel(phone)
366 for p in phones:
367 vcard.add_tel(p)
368 if country and locality:
369 vcard.add_address(country, locality)
370 elif country:
371 vcard.add_address(country, locality)
372 if pronouns:
373 vcard["pronouns"]["text"] = pronouns
375 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True)
376 self.session.create_task(
377 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
378 )
380 def get_roster_item(self):
381 item = {
382 "subscription": self.__get_subscription_string(),
383 "groups": [self.xmpp.ROSTER_GROUP],
384 }
385 if (n := self.name) is not None:
386 item["name"] = n
387 return {self.jid.bare: item}
389 async def add_to_roster(self, force: bool = False) -> None:
390 """
391 Add this contact to the user roster using :xep:`0356`
393 :param force: add even if the contact was already added successfully
394 """
395 if self.added_to_roster and not force:
396 return
397 if not self.session.user.preferences.get("roster_push", True):
398 log.debug("Roster push request by plugin ignored (--no-roster-push)")
399 return
400 try:
401 await self.xmpp["xep_0356"].set_roster(
402 jid=self.user_jid, roster_items=self.get_roster_item()
403 )
404 except PermissionError:
405 warnings.warn(
406 "Slidge does not have the privilege (XEP-0356) to manage rosters. "
407 "Consider configuring your XMPP server for that."
408 )
409 self.send_friend_request(
410 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
411 "slidge is not allowed to manage your roster."
412 )
413 return
414 except (IqError, IqTimeout) as e:
415 self.log.warning("Could not add to roster", exc_info=e)
416 else:
417 # we only broadcast pubsub events for contacts added to the roster
418 # so if something was set before, we need to push it now
419 self.added_to_roster = True
420 self.send_last_presence(force=True)
422 async def __broadcast_pubsub_items(self) -> None:
423 if not self.is_friend:
424 return
425 if not self.added_to_roster:
426 return
427 cached_avatar = self.get_cached_avatar()
428 if cached_avatar is not None:
429 await self.xmpp.pubsub.broadcast_avatar(
430 self.jid.bare, self.session.user_jid, cached_avatar
431 )
432 nick = self.name
434 if nick is not None:
435 self.xmpp.pubsub.broadcast_nick(
436 self.session.user_jid,
437 self.jid.bare,
438 nick,
439 )
441 def send_friend_request(self, text: str | None = None) -> None:
442 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
443 self._send(presence, nick=True)
445 async def accept_friend_request(self, text: str | None = None) -> None:
446 """
447 Call this to signify that this Contact has accepted to be a friend
448 of the user.
450 :param text: Optional message from the friend to the user
451 """
452 self.is_friend = True
453 self.added_to_roster = True
454 self.log.debug("Accepting friend request")
455 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
456 self._send(presence, nick=True)
457 self.send_last_presence()
458 await self.__broadcast_pubsub_items()
459 self.log.debug("Accepted friend request")
461 def reject_friend_request(self, text: str | None = None) -> None:
462 """
463 Call this to signify that this Contact has refused to be a contact
464 of the user (or that they don't want to be friends anymore)
466 :param text: Optional message from the non-friend to the user
467 """
468 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
469 self.offline()
470 self._send(presence, nick=True)
471 self.is_friend = False
473 async def on_friend_request(self, text: str = "") -> None:
474 """
475 Called when receiving a "subscribe" presence, ie, "I would like to add
476 you to my contacts/friends", from the user to this contact.
478 In XMPP terms: "I would like to receive your presence updates"
480 This is only called if self.is_friend = False. If self.is_friend = True,
481 slidge will automatically "accept the friend request", ie, reply with
482 a "subscribed" presence.
484 When called, a 'friend request event' should be sent to the legacy
485 service, and when the contact responds, you should either call
486 self.accept_subscription() or self.reject_subscription()
487 """
488 pass
490 async def on_friend_delete(self, text: str = "") -> None:
491 """
492 Called when receiving an "unsubscribed" presence, ie, "I would like to
493 remove you to my contacts/friends" or "I refuse your friend request"
494 from the user to this contact.
496 In XMPP terms: "You won't receive my presence updates anymore (or you
497 never have)".
498 """
499 pass
501 async def on_friend_accept(self) -> None:
502 """
503 Called when receiving a "subscribed" presence, ie, "I accept to be
504 your/confirm that you are my friend" from the user to this contact.
506 In XMPP terms: "You will receive my presence updates".
507 """
508 pass
510 def unsubscribe(self) -> None:
511 """
512 (internal use by slidge)
514 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
515 from this contact to the user, ie, "this contact has removed you from
516 their 'friends'".
517 """
518 for ptype in "unsubscribe", "unsubscribed", "unavailable":
519 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
521 async def update_info(self) -> None:
522 """
523 Fetch information about this contact from the legacy network
525 This is awaited on Contact instantiation, and should be overridden to
526 update the nickname, avatar, vcard [...] of this contact, by making
527 "legacy API calls".
529 To take advantage of the slidge avatar cache, you can check the .avatar
530 property to retrieve the "legacy file ID" of the cached avatar. If there
531 is no change, you should not call
532 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
533 to modify the ``.avatar`` property.
534 """
535 pass
537 async def fetch_vcard(self) -> None:
538 """
539 It the legacy network doesn't like that you fetch too many profiles on startup,
540 it's also possible to fetch it here, which will be called when XMPP clients
541 of the user request the vcard, if it hasn't been fetched before
542 :return:
543 """
544 pass
546 def _make_presence(
547 self,
548 *,
549 last_seen: datetime.datetime | None = None,
550 status_codes: set[int] | None = None,
551 user_full_jid: JID | None = None,
552 **presence_kwargs,
553 ):
554 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
555 caps = self.xmpp.plugin["xep_0115"]
556 if p.get_from().resource and self.stored.caps_ver:
557 p["caps"]["node"] = caps.caps_node
558 p["caps"]["hash"] = caps.hash
559 p["caps"]["ver"] = self.stored.caps_ver
560 return p
563def is_markable(stanza: Message | Presence):
564 if isinstance(stanza, Presence):
565 return False
566 return bool(stanza["body"])
569log = logging.getLogger(__name__)