Coverage for slidge/contact/contact.py: 85%
282 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import datetime
2import logging
3import warnings
4from datetime import date
5from typing import TYPE_CHECKING, Generic, Iterable, Iterator, Optional, Union
6from xml.etree import ElementTree as ET
8from slixmpp import JID, Message, Presence
9from slixmpp.exceptions import IqError, IqTimeout
10from slixmpp.plugins.xep_0292.stanza import VCard4
11from slixmpp.types import MessageTypes
12from sqlalchemy.exc import IntegrityError
14from ..core.mixins import AvatarMixin, FullCarbonMixin
15from ..core.mixins.disco import ContactAccountDiscoMixin
16from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
17from ..db.models import Contact, ContactSent, Participant
18from ..util import SubclassableOnce
19from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar
21if TYPE_CHECKING:
22 from ..core.session import BaseSession
23 from ..group.participant import LegacyParticipant
26class LegacyContact(
27 Generic[LegacyUserIdType],
28 AvatarMixin,
29 ContactAccountDiscoMixin,
30 FullCarbonMixin,
31 ReactionRecipientMixin,
32 ThreadRecipientMixin,
33 metaclass=SubclassableOnce,
34):
35 """
36 This class centralizes actions in relation to a specific legacy contact.
38 You shouldn't create instances of contacts manually, but rather rely on
39 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
40 singletons. The :class:`.LegacyRoster` instance of a session is accessible
41 through the :attr:`.BaseSession.contacts` attribute.
43 Typically, your plugin should have methods hook to the legacy events and
44 call appropriate methods here to transmit the "legacy action" to the xmpp
45 user. This should look like this:
47 .. code-block:python
49 class Session(BaseSession):
50 ...
52 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
53 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
54 contact.send_text(legacy_msg_event.text)
56 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
57 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
58 contact.composing()
59 ...
61 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
62 the user TO the contact, typically when the user uses an official client to
63 do an action such as sending a message or marking as message as read.
64 This will use :xep:`0363` to impersonate the XMPP user in order.
65 """
67 session: "BaseSession"
69 RESOURCE: str = "slidge"
70 """
71 A full JID, including a resource part is required for chat states (and maybe other stuff)
72 to work properly. This is the name of the resource the contacts will use.
73 """
74 PROPAGATE_PRESENCE_TO_GROUPS = True
76 mtype: MessageTypes = "chat"
77 _can_send_carbon = True
78 is_participant = False
79 is_group = False
81 _ONLY_SEND_PRESENCE_CHANGES = True
83 STRIP_SHORT_DELAY = True
84 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"}
86 INVITATION_RECIPIENT = True
88 stored: Contact
89 model: Contact
91 def __init__(self, session: "BaseSession", stored: Contact) -> None:
92 self.session = session
93 self.xmpp = session.xmpp
94 self.stored = stored
95 self._set_logger()
96 super().__init__()
98 @property
99 def jid(self): # type:ignore[override]
100 jid = JID(self.stored.jid)
101 jid.resource = self.RESOURCE
102 return jid
104 @property
105 def legacy_id(self):
106 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id)
108 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
109 if fetch and not self.stored.vcard_fetched:
110 await self.fetch_vcard()
111 if self.stored.vcard is None:
112 return None
114 return VCard4(xml=ET.fromstring(self.stored.vcard))
116 @property
117 def is_friend(self) -> bool:
118 return self.stored.is_friend
120 @is_friend.setter
121 def is_friend(self, value: bool) -> None:
122 if value == self.is_friend:
123 return
124 self.update_stored_attribute(is_friend=value)
126 @property
127 def added_to_roster(self) -> bool:
128 return self.stored.added_to_roster
130 @added_to_roster.setter
131 def added_to_roster(self, value: bool) -> None:
132 if value == self.added_to_roster:
133 return
134 self.update_stored_attribute(added_to_roster=value)
136 @property
137 def participants(self) -> Iterator["LegacyParticipant"]:
138 with self.xmpp.store.session() as orm:
139 self.stored = orm.merge(self.stored)
140 participants = self.stored.participants
141 for p in participants:
142 with self.xmpp.store.session() as orm:
143 p = orm.merge(p)
144 muc = self.session.bookmarks.from_store(p.room)
145 yield muc.participant_from_store(p, contact=self)
147 @property
148 def user_jid(self):
149 return self.session.user_jid
151 @property # type:ignore
152 def DISCO_TYPE(self) -> ClientType:
153 return self.client_type
155 @DISCO_TYPE.setter
156 def DISCO_TYPE(self, value: ClientType) -> None:
157 self.client_type = value
159 @property
160 def client_type(self) -> ClientType:
161 """
162 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
164 Default is "pc".
165 """
166 return self.stored.client_type
168 @client_type.setter
169 def client_type(self, value: ClientType) -> None:
170 if self.stored.client_type == value:
171 return
172 self.update_stored_attribute(client_type=value)
174 def _set_logger(self) -> None:
175 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
177 def __repr__(self) -> str:
178 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
180 def __get_subscription_string(self) -> str:
181 if self.is_friend:
182 return "both"
183 return "none"
185 def __propagate_to_participants(self, stanza: Presence) -> None:
186 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
187 return
189 ptype = stanza["type"]
190 if ptype in ("available", "chat"):
191 func_name = "online"
192 elif ptype in ("xa", "unavailable"):
193 # we map unavailable to extended_away, because offline is
194 # "participant leaves the MUC"
195 # TODO: improve this with a clear distinction between participant
196 # and member list
197 func_name = "extended_away"
198 elif ptype == "busy":
199 func_name = "busy"
200 elif ptype == "away":
201 func_name = "away"
202 else:
203 return
205 last_seen: Optional[datetime.datetime] = (
206 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None
207 )
209 kw = dict(status=stanza["status"], last_seen=last_seen)
211 for part in self.participants:
212 func = getattr(part, func_name)
213 func(**kw)
215 def _send(
216 self,
217 stanza: MessageOrPresenceTypeVar,
218 carbon: bool = False,
219 nick: bool = False,
220 **send_kwargs,
221 ) -> MessageOrPresenceTypeVar:
222 if carbon and isinstance(stanza, Message):
223 stanza["to"] = self.jid.bare
224 stanza["from"] = self.user_jid
225 self._privileged_send(stanza)
226 return stanza # type:ignore
228 if isinstance(stanza, Presence):
229 if not self._updating_info:
230 self.__propagate_to_participants(stanza)
231 if (
232 not self.is_friend
233 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
234 ):
235 return stanza # type:ignore
236 if self.name and (nick or not self.is_friend):
237 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
238 n["nick"] = self.name
239 stanza.append(n)
240 if (
241 not self._updating_info
242 and self.xmpp.MARK_ALL_MESSAGES
243 and is_markable(stanza)
244 ):
245 try:
246 with self.xmpp.store.session(expire_on_commit=False) as orm:
247 self.stored = orm.merge(self.stored)
248 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
249 orm.add(new)
250 self.stored.sent_order.append(new)
251 orm.commit()
252 except IntegrityError:
253 self.log.warning("Contact has already sent message %s", stanza["id"])
254 stanza["to"] = self.user_jid
255 stanza.send()
256 return stanza
258 def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str) -> list[str]:
259 """
260 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
262 Plugins have no reason to use this, but it is used by slidge core
263 for legacy networks that need to mark all messages as read (most XMPP
264 clients only send a read marker for the latest message).
266 This has side effects, if the horizon XMPP id is found, messages up to
267 this horizon are not cleared, to avoid sending the same read mark twice.
269 :param horizon_xmpp_id: The latest message
270 :return: A list of XMPP ids or None if horizon_xmpp_id was not found
271 """
272 with self.xmpp.store.session() as orm:
273 assert self.stored.id is not None
274 ids = self.xmpp.store.contacts.pop_sent_up_to(
275 orm, self.stored.id, horizon_xmpp_id
276 )
277 orm.commit()
278 return ids
280 @property
281 def name(self) -> str:
282 """
283 Friendly name of the contact, as it should appear in the user's roster
284 """
285 return self.stored.nick or ""
287 @name.setter
288 def name(self, n: Optional[str]) -> None:
289 if self.stored.nick == n:
290 return
291 self.update_stored_attribute(nick=n)
292 self._set_logger()
293 if self.is_friend and self.added_to_roster:
294 self.xmpp.pubsub.broadcast_nick(
295 user_jid=self.user_jid, jid=self.jid.bare, nick=n
296 )
297 for p in self.participants:
298 p.nickname = n or str(self.legacy_id)
300 def _post_avatar_update(self, cached_avatar) -> None:
301 if self.is_friend and self.added_to_roster:
302 self.session.create_task(
303 self.session.xmpp.pubsub.broadcast_avatar(
304 self.jid.bare, self.session.user_jid, cached_avatar
305 )
306 )
307 for p in self.participants:
308 self.log.debug("Propagating new avatar to %s", p.muc)
309 p.send_last_presence(force=True, no_cache_online=True)
311 def set_vcard(
312 self,
313 /,
314 full_name: Optional[str] = None,
315 given: Optional[str] = None,
316 surname: Optional[str] = None,
317 birthday: Optional[date] = None,
318 phone: Optional[str] = None,
319 phones: Iterable[str] = (),
320 note: Optional[str] = None,
321 url: Optional[str] = None,
322 email: Optional[str] = None,
323 country: Optional[str] = None,
324 locality: Optional[str] = None,
325 pronouns: Optional[str] = None,
326 ) -> None:
327 vcard = VCard4()
328 vcard.add_impp(f"xmpp:{self.jid.bare}")
330 if n := self.name:
331 vcard.add_nickname(n)
332 if full_name:
333 vcard["full_name"] = full_name
334 elif n:
335 vcard["full_name"] = n
337 if given:
338 vcard["given"] = given
339 if surname:
340 vcard["surname"] = surname
341 if birthday:
342 vcard["birthday"] = birthday
344 if note:
345 vcard.add_note(note)
346 if url:
347 vcard.add_url(url)
348 if email:
349 vcard.add_email(email)
350 if phone:
351 vcard.add_tel(phone)
352 for p in phones:
353 vcard.add_tel(p)
354 if country and locality:
355 vcard.add_address(country, locality)
356 elif country:
357 vcard.add_address(country, locality)
358 if pronouns:
359 vcard["pronouns"]["text"] = pronouns
361 self.update_stored_attribute(vcard=str(vcard), vcard_fetched=True)
362 self.session.create_task(
363 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
364 )
366 def get_roster_item(self):
367 item = {
368 "subscription": self.__get_subscription_string(),
369 "groups": [self.xmpp.ROSTER_GROUP],
370 }
371 if (n := self.name) is not None:
372 item["name"] = n
373 return {self.jid.bare: item}
375 async def add_to_roster(self, force: bool = False) -> None:
376 """
377 Add this contact to the user roster using :xep:`0356`
379 :param force: add even if the contact was already added successfully
380 """
381 if self.added_to_roster and not force:
382 return
383 if not self.session.user.preferences.get("roster_push", True):
384 log.debug("Roster push request by plugin ignored (--no-roster-push)")
385 return
386 try:
387 await self.xmpp["xep_0356"].set_roster(
388 jid=self.user_jid, roster_items=self.get_roster_item()
389 )
390 except PermissionError:
391 warnings.warn(
392 "Slidge does not have the privilege (XEP-0356) to manage rosters. "
393 "Consider configuring your XMPP server for that."
394 )
395 self.send_friend_request(
396 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
397 "slidge is not allowed to manage your roster."
398 )
399 return
400 except (IqError, IqTimeout) as e:
401 self.log.warning("Could not add to roster", exc_info=e)
402 else:
403 # we only broadcast pubsub events for contacts added to the roster
404 # so if something was set before, we need to push it now
405 self.added_to_roster = True
406 self.send_last_presence(force=True)
408 async def __broadcast_pubsub_items(self) -> None:
409 if not self.is_friend:
410 return
411 if not self.added_to_roster:
412 return
413 cached_avatar = self.get_cached_avatar()
414 if cached_avatar is not None:
415 await self.xmpp.pubsub.broadcast_avatar(
416 self.jid.bare, self.session.user_jid, cached_avatar
417 )
418 nick = self.name
420 if nick is not None:
421 self.xmpp.pubsub.broadcast_nick(
422 self.session.user_jid,
423 self.jid.bare,
424 nick,
425 )
427 def send_friend_request(self, text: Optional[str] = None) -> None:
428 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
429 self._send(presence, nick=True)
431 async def accept_friend_request(self, text: Optional[str] = None) -> None:
432 """
433 Call this to signify that this Contact has accepted to be a friend
434 of the user.
436 :param text: Optional message from the friend to the user
437 """
438 self.is_friend = True
439 self.added_to_roster = True
440 self.log.debug("Accepting friend request")
441 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
442 self._send(presence, nick=True)
443 self.send_last_presence()
444 await self.__broadcast_pubsub_items()
445 self.log.debug("Accepted friend request")
447 def reject_friend_request(self, text: Optional[str] = None) -> None:
448 """
449 Call this to signify that this Contact has refused to be a contact
450 of the user (or that they don't want to be friends anymore)
452 :param text: Optional message from the non-friend to the user
453 """
454 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
455 self.offline()
456 self._send(presence, nick=True)
457 self.is_friend = False
459 async def on_friend_request(self, text: str = "") -> None:
460 """
461 Called when receiving a "subscribe" presence, ie, "I would like to add
462 you to my contacts/friends", from the user to this contact.
464 In XMPP terms: "I would like to receive your presence updates"
466 This is only called if self.is_friend = False. If self.is_friend = True,
467 slidge will automatically "accept the friend request", ie, reply with
468 a "subscribed" presence.
470 When called, a 'friend request event' should be sent to the legacy
471 service, and when the contact responds, you should either call
472 self.accept_subscription() or self.reject_subscription()
473 """
474 pass
476 async def on_friend_delete(self, text: str = "") -> None:
477 """
478 Called when receiving an "unsubscribed" presence, ie, "I would like to
479 remove you to my contacts/friends" or "I refuse your friend request"
480 from the user to this contact.
482 In XMPP terms: "You won't receive my presence updates anymore (or you
483 never have)".
484 """
485 pass
487 async def on_friend_accept(self) -> None:
488 """
489 Called when receiving a "subscribed" presence, ie, "I accept to be
490 your/confirm that you are my friend" from the user to this contact.
492 In XMPP terms: "You will receive my presence updates".
493 """
494 pass
496 def unsubscribe(self) -> None:
497 """
498 (internal use by slidge)
500 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
501 from this contact to the user, ie, "this contact has removed you from
502 their 'friends'".
503 """
504 for ptype in "unsubscribe", "unsubscribed", "unavailable":
505 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
507 async def update_info(self) -> None:
508 """
509 Fetch information about this contact from the legacy network
511 This is awaited on Contact instantiation, and should be overridden to
512 update the nickname, avatar, vcard [...] of this contact, by making
513 "legacy API calls".
515 To take advantage of the slidge avatar cache, you can check the .avatar
516 property to retrieve the "legacy file ID" of the cached avatar. If there
517 is no change, you should not call
518 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
519 to modify the ``.avatar`` property.
520 """
521 pass
523 async def fetch_vcard(self) -> None:
524 """
525 It the legacy network doesn't like that you fetch too many profiles on startup,
526 it's also possible to fetch it here, which will be called when XMPP clients
527 of the user request the vcard, if it hasn't been fetched before
528 :return:
529 """
530 pass
532 def _make_presence(
533 self,
534 *,
535 last_seen: Optional[datetime.datetime] = None,
536 status_codes: Optional[set[int]] = None,
537 user_full_jid: Optional[JID] = None,
538 **presence_kwargs,
539 ):
540 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
541 caps = self.xmpp.plugin["xep_0115"]
542 if p.get_from().resource and self.stored.caps_ver:
543 p["caps"]["node"] = caps.caps_node
544 p["caps"]["hash"] = caps.hash
545 p["caps"]["ver"] = self.stored.caps_ver
546 return p
549def is_markable(stanza: Union[Message, Presence]):
550 if isinstance(stanza, Presence):
551 return False
552 return bool(stanza["body"])
555log = logging.getLogger(__name__)