Coverage for slidge / contact / contact.py: 86%
281 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import datetime
2import logging
3import warnings
4from datetime import date
5from typing import TYPE_CHECKING, Generic, Iterable, Iterator, Optional, Union
6from xml.etree import ElementTree as ET
8from slixmpp import JID, Message, Presence
9from slixmpp.exceptions import IqError, IqTimeout
10from slixmpp.plugins.xep_0292.stanza import VCard4
11from slixmpp.types import MessageTypes
13from ..core.mixins import AvatarMixin, FullCarbonMixin
14from ..core.mixins.disco import ContactAccountDiscoMixin
15from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
16from ..db.models import Contact, ContactSent
17from ..util import SubclassableOnce
18from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar
20if TYPE_CHECKING:
21 from ..core.session import BaseSession
22 from ..group.participant import LegacyParticipant
25class LegacyContact(
26 Generic[LegacyUserIdType],
27 AvatarMixin,
28 ContactAccountDiscoMixin,
29 FullCarbonMixin,
30 ReactionRecipientMixin,
31 ThreadRecipientMixin,
32 metaclass=SubclassableOnce,
33):
34 """
35 This class centralizes actions in relation to a specific legacy contact.
37 You shouldn't create instances of contacts manually, but rather rely on
38 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
39 singletons. The :class:`.LegacyRoster` instance of a session is accessible
40 through the :attr:`.BaseSession.contacts` attribute.
42 Typically, your plugin should have methods hook to the legacy events and
43 call appropriate methods here to transmit the "legacy action" to the xmpp
44 user. This should look like this:
46 .. code-block:python
48 class Session(BaseSession):
49 ...
51 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
52 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
53 contact.send_text(legacy_msg_event.text)
55 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
56 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
57 contact.composing()
58 ...
60 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
61 the user TO the contact, typically when the user uses an official client to
62 do an action such as sending a message or marking as message as read.
63 This will use :xep:`0363` to impersonate the XMPP user in order.
64 """
66 session: "BaseSession"
68 RESOURCE: str = "slidge"
69 """
70 A full JID, including a resource part is required for chat states (and maybe other stuff)
71 to work properly. This is the name of the resource the contacts will use.
72 """
73 PROPAGATE_PRESENCE_TO_GROUPS = True
75 mtype: MessageTypes = "chat"
76 _can_send_carbon = True
77 is_participant = False
78 is_group = False
80 _ONLY_SEND_PRESENCE_CHANGES = True
82 STRIP_SHORT_DELAY = True
83 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"}
85 INVITATION_RECIPIENT = True
87 stored: Contact
88 model: Contact
90 def __init__(self, session: "BaseSession", stored: Contact) -> None:
91 self.session = session
92 self.xmpp = session.xmpp
93 self.stored = stored
94 self._set_logger()
95 super().__init__()
97 @property
98 def jid(self): # type:ignore[override]
99 jid = JID(self.stored.jid)
100 jid.resource = self.RESOURCE
101 return jid
103 @property
104 def legacy_id(self):
105 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id)
107 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
108 if fetch and not self.stored.vcard_fetched:
109 await self.fetch_vcard()
110 if self.stored.vcard is None:
111 return None
113 return VCard4(xml=ET.fromstring(self.stored.vcard))
115 @property
116 def is_friend(self) -> bool:
117 return self.stored.is_friend
119 @is_friend.setter
120 def is_friend(self, value: bool) -> None:
121 if value == self.is_friend:
122 return
123 self.update_stored_attribute(is_friend=value)
125 @property
126 def added_to_roster(self) -> bool:
127 return self.stored.added_to_roster
129 @added_to_roster.setter
130 def added_to_roster(self, value: bool) -> None:
131 if value == self.added_to_roster:
132 return
133 self.update_stored_attribute(added_to_roster=value)
135 @property
136 def participants(self) -> Iterator["LegacyParticipant"]:
137 with self.xmpp.store.session() as orm:
138 self.stored = orm.merge(self.stored)
139 participants = self.stored.participants
140 for p in participants:
141 with self.xmpp.store.session() as orm:
142 p = orm.merge(p)
143 muc = self.session.bookmarks.from_store(p.room)
144 yield muc.participant_from_store(p, contact=self)
146 @property
147 def user_jid(self):
148 return self.session.user_jid
150 @property # type:ignore
151 def DISCO_TYPE(self) -> ClientType:
152 return self.client_type
154 @DISCO_TYPE.setter
155 def DISCO_TYPE(self, value: ClientType) -> None:
156 self.client_type = value
158 @property
159 def client_type(self) -> ClientType:
160 """
161 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
163 Default is "pc".
164 """
165 return self.stored.client_type
167 @client_type.setter
168 def client_type(self, value: ClientType) -> None:
169 if self.stored.client_type == value:
170 return
171 self.update_stored_attribute(client_type=value)
173 def _set_logger(self) -> None:
174 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
176 def __repr__(self) -> str:
177 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
179 def __get_subscription_string(self) -> str:
180 if self.is_friend:
181 return "both"
182 return "none"
184 def __propagate_to_participants(self, stanza: Presence) -> None:
185 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
186 return
188 ptype = stanza["type"]
189 if ptype in ("available", "chat"):
190 func_name = "online"
191 elif ptype in ("xa", "unavailable"):
192 # we map unavailable to extended_away, because offline is
193 # "participant leaves the MUC"
194 # TODO: improve this with a clear distinction between participant
195 # and member list
196 func_name = "extended_away"
197 elif ptype == "busy":
198 func_name = "busy"
199 elif ptype == "away":
200 func_name = "away"
201 else:
202 return
204 last_seen: Optional[datetime.datetime] = (
205 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None
206 )
208 kw = dict(status=stanza["status"], last_seen=last_seen)
210 for part in self.participants:
211 func = getattr(part, func_name)
212 func(**kw)
214 def _send(
215 self,
216 stanza: MessageOrPresenceTypeVar,
217 carbon: bool = False,
218 nick: bool = False,
219 **send_kwargs,
220 ) -> MessageOrPresenceTypeVar:
221 if carbon and isinstance(stanza, Message):
222 stanza["to"] = self.jid.bare
223 stanza["from"] = self.user_jid
224 self._privileged_send(stanza)
225 return stanza # type:ignore
227 if isinstance(stanza, Presence):
228 if not self._updating_info:
229 self.__propagate_to_participants(stanza)
230 if (
231 not self.is_friend
232 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
233 ):
234 return stanza # type:ignore
235 if self.name and (nick or not self.is_friend):
236 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
237 n["nick"] = self.name
238 stanza.append(n)
239 if (
240 not self._updating_info
241 and self.xmpp.MARK_ALL_MESSAGES
242 and is_markable(stanza)
243 ):
244 with self.xmpp.store.session(expire_on_commit=False) as orm:
245 self.stored = orm.merge(self.stored)
246 exists = (
247 orm.query(ContactSent)
248 .filter_by(contact_id=self.stored.id, msg_id=stanza["id"])
249 .first()
250 )
251 if exists:
252 self.log.warning(
253 "Contact has already sent message %s", stanza["id"]
254 )
255 else:
256 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
257 orm.add(new)
258 self.stored.sent_order.append(new)
259 orm.commit()
260 stanza["to"] = self.user_jid
261 stanza.send()
262 return stanza
264 def pop_unread_xmpp_ids_up_to(self, horizon_xmpp_id: str) -> list[str]:
265 """
266 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
268 Legacy modules have no reason to use this, but it is used by slidge core
269 for legacy networks that need to mark all messages as read (most XMPP
270 clients only send a read marker for the latest message).
272 This has side effects, if the horizon XMPP id is found, messages up to
273 this horizon are cleared, to avoid sending the same read mark twice.
275 :param horizon_xmpp_id: The latest message
276 :return: A list of XMPP ids up to horizon_xmpp_id, included
277 """
278 with self.xmpp.store.session() as orm:
279 assert self.stored.id is not None
280 ids = self.xmpp.store.contacts.pop_sent_up_to(
281 orm, self.stored.id, horizon_xmpp_id
282 )
283 orm.commit()
284 return ids
286 @property
287 def name(self) -> str:
288 """
289 Friendly name of the contact, as it should appear in the user's roster
290 """
291 return self.stored.nick or ""
293 @name.setter
294 def name(self, n: Optional[str]) -> None:
295 if self.stored.nick == n:
296 return
297 self.update_stored_attribute(nick=n)
298 self._set_logger()
299 if self.is_friend and self.added_to_roster:
300 self.xmpp.pubsub.broadcast_nick(
301 user_jid=self.user_jid, jid=self.jid.bare, nick=n
302 )
303 for p in self.participants:
304 p.nickname = n or str(self.legacy_id)
306 def _post_avatar_update(self, cached_avatar) -> None:
307 if self.is_friend and self.added_to_roster:
308 self.session.create_task(
309 self.session.xmpp.pubsub.broadcast_avatar(
310 self.jid.bare, self.session.user_jid, cached_avatar
311 )
312 )
313 for p in self.participants:
314 self.log.debug("Propagating new avatar to %s", p.muc)
315 p.send_last_presence(force=True, no_cache_online=True)
317 def set_vcard(
318 self,
319 /,
320 full_name: Optional[str] = None,
321 given: Optional[str] = None,
322 surname: Optional[str] = None,
323 birthday: Optional[date] = None,
324 phone: Optional[str] = None,
325 phones: Iterable[str] = (),
326 note: Optional[str] = None,
327 url: Optional[str] = None,
328 email: Optional[str] = None,
329 country: Optional[str] = None,
330 locality: Optional[str] = None,
331 pronouns: Optional[str] = None,
332 ) -> None:
333 """
334 Update xep:`0292` data for this contact.
336 Use this for additional metadata about this contact to be available to XMPP
337 clients. The "note" argument is a text of arbitrary size and can be useful when
338 no other field is a good fit.
339 """
340 vcard = VCard4()
341 vcard.add_impp(f"xmpp:{self.jid.bare}")
343 if n := self.name:
344 vcard.add_nickname(n)
345 if full_name:
346 vcard["full_name"] = full_name
347 elif n:
348 vcard["full_name"] = n
350 if given:
351 vcard["given"] = given
352 if surname:
353 vcard["surname"] = surname
354 if birthday:
355 vcard["birthday"] = birthday
357 if note:
358 vcard.add_note(note)
359 if url:
360 vcard.add_url(url)
361 if email:
362 vcard.add_email(email)
363 if phone:
364 vcard.add_tel(phone)
365 for p in phones:
366 vcard.add_tel(p)
367 if country and locality:
368 vcard.add_address(country, locality)
369 elif country:
370 vcard.add_address(country, locality)
371 if pronouns:
372 vcard["pronouns"]["text"] = pronouns
374 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True)
375 self.session.create_task(
376 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
377 )
379 def get_roster_item(self):
380 item = {
381 "subscription": self.__get_subscription_string(),
382 "groups": [self.xmpp.ROSTER_GROUP],
383 }
384 if (n := self.name) is not None:
385 item["name"] = n
386 return {self.jid.bare: item}
388 async def add_to_roster(self, force: bool = False) -> None:
389 """
390 Add this contact to the user roster using :xep:`0356`
392 :param force: add even if the contact was already added successfully
393 """
394 if self.added_to_roster and not force:
395 return
396 if not self.session.user.preferences.get("roster_push", True):
397 log.debug("Roster push request by plugin ignored (--no-roster-push)")
398 return
399 try:
400 await self.xmpp["xep_0356"].set_roster(
401 jid=self.user_jid, roster_items=self.get_roster_item()
402 )
403 except PermissionError:
404 warnings.warn(
405 "Slidge does not have the privilege (XEP-0356) to manage rosters. "
406 "Consider configuring your XMPP server for that."
407 )
408 self.send_friend_request(
409 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
410 "slidge is not allowed to manage your roster."
411 )
412 return
413 except (IqError, IqTimeout) as e:
414 self.log.warning("Could not add to roster", exc_info=e)
415 else:
416 # we only broadcast pubsub events for contacts added to the roster
417 # so if something was set before, we need to push it now
418 self.added_to_roster = True
419 self.send_last_presence(force=True)
421 async def __broadcast_pubsub_items(self) -> None:
422 if not self.is_friend:
423 return
424 if not self.added_to_roster:
425 return
426 cached_avatar = self.get_cached_avatar()
427 if cached_avatar is not None:
428 await self.xmpp.pubsub.broadcast_avatar(
429 self.jid.bare, self.session.user_jid, cached_avatar
430 )
431 nick = self.name
433 if nick is not None:
434 self.xmpp.pubsub.broadcast_nick(
435 self.session.user_jid,
436 self.jid.bare,
437 nick,
438 )
440 def send_friend_request(self, text: Optional[str] = None) -> None:
441 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
442 self._send(presence, nick=True)
444 async def accept_friend_request(self, text: Optional[str] = None) -> None:
445 """
446 Call this to signify that this Contact has accepted to be a friend
447 of the user.
449 :param text: Optional message from the friend to the user
450 """
451 self.is_friend = True
452 self.added_to_roster = True
453 self.log.debug("Accepting friend request")
454 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
455 self._send(presence, nick=True)
456 self.send_last_presence()
457 await self.__broadcast_pubsub_items()
458 self.log.debug("Accepted friend request")
460 def reject_friend_request(self, text: Optional[str] = None) -> None:
461 """
462 Call this to signify that this Contact has refused to be a contact
463 of the user (or that they don't want to be friends anymore)
465 :param text: Optional message from the non-friend to the user
466 """
467 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
468 self.offline()
469 self._send(presence, nick=True)
470 self.is_friend = False
472 async def on_friend_request(self, text: str = "") -> None:
473 """
474 Called when receiving a "subscribe" presence, ie, "I would like to add
475 you to my contacts/friends", from the user to this contact.
477 In XMPP terms: "I would like to receive your presence updates"
479 This is only called if self.is_friend = False. If self.is_friend = True,
480 slidge will automatically "accept the friend request", ie, reply with
481 a "subscribed" presence.
483 When called, a 'friend request event' should be sent to the legacy
484 service, and when the contact responds, you should either call
485 self.accept_subscription() or self.reject_subscription()
486 """
487 pass
489 async def on_friend_delete(self, text: str = "") -> None:
490 """
491 Called when receiving an "unsubscribed" presence, ie, "I would like to
492 remove you to my contacts/friends" or "I refuse your friend request"
493 from the user to this contact.
495 In XMPP terms: "You won't receive my presence updates anymore (or you
496 never have)".
497 """
498 pass
500 async def on_friend_accept(self) -> None:
501 """
502 Called when receiving a "subscribed" presence, ie, "I accept to be
503 your/confirm that you are my friend" from the user to this contact.
505 In XMPP terms: "You will receive my presence updates".
506 """
507 pass
509 def unsubscribe(self) -> None:
510 """
511 (internal use by slidge)
513 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
514 from this contact to the user, ie, "this contact has removed you from
515 their 'friends'".
516 """
517 for ptype in "unsubscribe", "unsubscribed", "unavailable":
518 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
520 async def update_info(self) -> None:
521 """
522 Fetch information about this contact from the legacy network
524 This is awaited on Contact instantiation, and should be overridden to
525 update the nickname, avatar, vcard [...] of this contact, by making
526 "legacy API calls".
528 To take advantage of the slidge avatar cache, you can check the .avatar
529 property to retrieve the "legacy file ID" of the cached avatar. If there
530 is no change, you should not call
531 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
532 to modify the ``.avatar`` property.
533 """
534 pass
536 async def fetch_vcard(self) -> None:
537 """
538 It the legacy network doesn't like that you fetch too many profiles on startup,
539 it's also possible to fetch it here, which will be called when XMPP clients
540 of the user request the vcard, if it hasn't been fetched before
541 :return:
542 """
543 pass
545 def _make_presence(
546 self,
547 *,
548 last_seen: Optional[datetime.datetime] = None,
549 status_codes: Optional[set[int]] = None,
550 user_full_jid: Optional[JID] = None,
551 **presence_kwargs,
552 ):
553 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
554 caps = self.xmpp.plugin["xep_0115"]
555 if p.get_from().resource and self.stored.caps_ver:
556 p["caps"]["node"] = caps.caps_node
557 p["caps"]["hash"] = caps.hash
558 p["caps"]["ver"] = self.stored.caps_ver
559 return p
562def is_markable(stanza: Union[Message, Presence]):
563 if isinstance(stanza, Presence):
564 return False
565 return bool(stanza["body"])
568log = logging.getLogger(__name__)