Coverage for slidge/contact/contact.py: 85%
286 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import datetime
2import logging
3import warnings
4from datetime import date
5from typing import TYPE_CHECKING, Generic, Iterable, Iterator, Optional, Union
6from xml.etree import ElementTree as ET
8from slixmpp import JID, Message, Presence
9from slixmpp.exceptions import IqError, IqTimeout
10from slixmpp.plugins.xep_0292.stanza import VCard4
11from slixmpp.types import MessageTypes
12from sqlalchemy.exc import IntegrityError
14from ..core.mixins import AvatarMixin, FullCarbonMixin
15from ..core.mixins.disco import ContactAccountDiscoMixin
16from ..core.mixins.recipient import ReactionRecipientMixin, ThreadRecipientMixin
17from ..db.models import Contact, ContactSent, Participant
18from ..util import SubclassableOnce
19from ..util.types import ClientType, LegacyUserIdType, MessageOrPresenceTypeVar
21if TYPE_CHECKING:
22 from ..core.session import BaseSession
23 from ..group.participant import LegacyParticipant
26class LegacyContact(
27 Generic[LegacyUserIdType],
28 AvatarMixin,
29 ContactAccountDiscoMixin,
30 FullCarbonMixin,
31 ReactionRecipientMixin,
32 ThreadRecipientMixin,
33 metaclass=SubclassableOnce,
34):
35 """
36 This class centralizes actions in relation to a specific legacy contact.
38 You shouldn't create instances of contacts manually, but rather rely on
39 :meth:`.LegacyRoster.by_legacy_id` to ensure that contact instances are
40 singletons. The :class:`.LegacyRoster` instance of a session is accessible
41 through the :attr:`.BaseSession.contacts` attribute.
43 Typically, your plugin should have methods hook to the legacy events and
44 call appropriate methods here to transmit the "legacy action" to the xmpp
45 user. This should look like this:
47 .. code-block:python
49 class Session(BaseSession):
50 ...
52 async def on_cool_chat_network_new_text_message(self, legacy_msg_event):
53 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
54 contact.send_text(legacy_msg_event.text)
56 async def on_cool_chat_network_new_typing_event(self, legacy_typing_event):
57 contact = self.contacts.by_legacy_id(legacy_msg_event.from)
58 contact.composing()
59 ...
61 Use ``carbon=True`` as a keyword arg for methods to represent an action FROM
62 the user TO the contact, typically when the user uses an official client to
63 do an action such as sending a message or marking as message as read.
64 This will use :xep:`0363` to impersonate the XMPP user in order.
65 """
67 session: "BaseSession"
69 RESOURCE: str = "slidge"
70 """
71 A full JID, including a resource part is required for chat states (and maybe other stuff)
72 to work properly. This is the name of the resource the contacts will use.
73 """
74 PROPAGATE_PRESENCE_TO_GROUPS = True
76 mtype: MessageTypes = "chat"
77 _can_send_carbon = True
78 is_group = False
80 _ONLY_SEND_PRESENCE_CHANGES = True
82 STRIP_SHORT_DELAY = True
83 _NON_FRIEND_PRESENCES_FILTER = {"subscribe", "unsubscribed"}
85 INVITATION_RECIPIENT = True
87 stored: Contact
88 model: Contact
90 def __init__(self, session: "BaseSession", stored: Contact) -> None:
91 self.session = session
92 self.xmpp = session.xmpp
93 self.stored = stored
94 self._set_logger()
95 super().__init__()
97 @property
98 def jid(self):
99 jid = JID(self.stored.jid)
100 jid.resource = self.RESOURCE
101 return jid
103 @property
104 def legacy_id(self):
105 return self.xmpp.LEGACY_CONTACT_ID_TYPE(self.stored.legacy_id)
107 async def get_vcard(self, fetch: bool = True) -> VCard4 | None:
108 if fetch and not self.stored.vcard_fetched:
109 await self.fetch_vcard()
110 if self.stored.vcard is None:
111 return None
113 return VCard4(xml=ET.fromstring(self.stored.vcard))
115 @property
116 def is_friend(self) -> bool:
117 return self.stored.is_friend
119 @is_friend.setter
120 def is_friend(self, value: bool) -> None:
121 if value == self.is_friend:
122 return
123 self.stored.is_friend = value
124 self.commit()
126 @property
127 def added_to_roster(self) -> bool:
128 return self.stored.added_to_roster
130 @added_to_roster.setter
131 def added_to_roster(self, value: bool) -> None:
132 if value == self.added_to_roster:
133 return
134 self.stored.added_to_roster = value
135 self.commit()
137 @property
138 def participants(self) -> Iterator["LegacyParticipant"]:
139 with self.xmpp.store.session() as orm:
140 self.stored = orm.merge(self.stored)
141 participants = self.stored.participants
142 for p in participants:
143 with self.xmpp.store.session() as orm:
144 orm.add(p)
145 muc = self.session.bookmarks.from_store(p.room)
146 yield muc.participant_from_store(p, contact=self)
148 @property
149 def user_jid(self):
150 return self.session.user_jid
152 @property # type:ignore
153 def DISCO_TYPE(self) -> ClientType:
154 return self.client_type
156 @DISCO_TYPE.setter
157 def DISCO_TYPE(self, value: ClientType) -> None:
158 self.client_type = value
160 @property
161 def client_type(self) -> ClientType:
162 """
163 The client type of this contact, cf https://xmpp.org/registrar/disco-categories.html#client
165 Default is "pc".
166 """
167 return self.stored.client_type
169 @client_type.setter
170 def client_type(self, value: ClientType) -> None:
171 if self.stored.client_type == value:
172 return
173 self.stored.client_type = value
174 self.commit()
176 def _set_logger(self) -> None:
177 self.log = logging.getLogger(f"{self.user_jid.bare}:contact:{self}")
179 def __repr__(self) -> str:
180 return f"<Contact #{self.stored.id} '{self.name}' ({self.legacy_id} - {self.jid.user})'>"
182 def __get_subscription_string(self) -> str:
183 if self.is_friend:
184 return "both"
185 return "none"
187 def __propagate_to_participants(self, stanza: Presence) -> None:
188 if not self.PROPAGATE_PRESENCE_TO_GROUPS:
189 return
191 ptype = stanza["type"]
192 if ptype in ("available", "chat"):
193 func_name = "online"
194 elif ptype in ("xa", "unavailable"):
195 # we map unavailable to extended_away, because offline is
196 # "participant leaves the MUC"
197 # TODO: improve this with a clear distinction between participant
198 # and member list
199 func_name = "extended_away"
200 elif ptype == "busy":
201 func_name = "busy"
202 elif ptype == "away":
203 func_name = "away"
204 else:
205 return
207 last_seen: Optional[datetime.datetime] = (
208 stanza["idle"]["since"] if stanza.get_plugin("idle", check=True) else None
209 )
211 kw = dict(status=stanza["status"], last_seen=last_seen)
213 for part in self.participants:
214 func = getattr(part, func_name)
215 func(**kw)
217 def _send(
218 self,
219 stanza: MessageOrPresenceTypeVar,
220 carbon: bool = False,
221 nick: bool = False,
222 **send_kwargs,
223 ) -> MessageOrPresenceTypeVar:
224 if carbon and isinstance(stanza, Message):
225 stanza["to"] = self.jid.bare
226 stanza["from"] = self.user_jid
227 self._privileged_send(stanza)
228 return stanza # type:ignore
230 if isinstance(stanza, Presence):
231 if not self._updating_info:
232 self.__propagate_to_participants(stanza)
233 if (
234 not self.is_friend
235 and stanza["type"] not in self._NON_FRIEND_PRESENCES_FILTER
236 ):
237 return stanza # type:ignore
238 if self.name and (nick or not self.is_friend):
239 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
240 n["nick"] = self.name
241 stanza.append(n)
242 if (
243 not self._updating_info
244 and self.xmpp.MARK_ALL_MESSAGES
245 and is_markable(stanza)
246 ):
247 try:
248 with self.xmpp.store.session(expire_on_commit=False) as orm:
249 self.stored = orm.merge(self.stored)
250 new = ContactSent(contact=self.stored, msg_id=stanza["id"])
251 orm.add(new)
252 self.stored.sent_order.append(new)
253 orm.commit()
254 except IntegrityError:
255 self.log.warning("Contact has already sent message %s", stanza["id"])
256 stanza["to"] = self.user_jid
257 stanza.send()
258 return stanza
260 def get_msg_xmpp_id_up_to(self, horizon_xmpp_id: str) -> list[str]:
261 """
262 Return XMPP msg ids sent by this contact up to a given XMPP msg id.
264 Plugins have no reason to use this, but it is used by slidge core
265 for legacy networks that need to mark all messages as read (most XMPP
266 clients only send a read marker for the latest message).
268 This has side effects, if the horizon XMPP id is found, messages up to
269 this horizon are not cleared, to avoid sending the same read mark twice.
271 :param horizon_xmpp_id: The latest message
272 :return: A list of XMPP ids or None if horizon_xmpp_id was not found
273 """
274 with self.xmpp.store.session() as orm:
275 assert self.stored.id is not None
276 ids = self.xmpp.store.contacts.pop_sent_up_to(
277 orm, self.stored.id, horizon_xmpp_id
278 )
279 orm.commit()
280 return ids
282 @property
283 def name(self) -> str:
284 """
285 Friendly name of the contact, as it should appear in the user's roster
286 """
287 return self.stored.nick or ""
289 @name.setter
290 def name(self, n: Optional[str]) -> None:
291 if self.stored.nick == n:
292 return
293 self.stored.nick = n
294 self._set_logger()
295 if self.is_friend and self.added_to_roster:
296 self.xmpp.pubsub.broadcast_nick(
297 user_jid=self.user_jid, jid=self.jid.bare, nick=n
298 )
299 self.commit()
300 for p in self.participants:
301 p.nickname = n
303 def _post_avatar_update(self, cached_avatar) -> None:
304 if self.is_friend and self.added_to_roster:
305 self.session.create_task(
306 self.session.xmpp.pubsub.broadcast_avatar(
307 self.jid.bare, self.session.user_jid, cached_avatar
308 )
309 )
310 for p in self.participants:
311 self.log.debug("Propagating new avatar to %s", p.muc)
312 p.send_last_presence(force=True, no_cache_online=True)
314 def set_vcard(
315 self,
316 /,
317 full_name: Optional[str] = None,
318 given: Optional[str] = None,
319 surname: Optional[str] = None,
320 birthday: Optional[date] = None,
321 phone: Optional[str] = None,
322 phones: Iterable[str] = (),
323 note: Optional[str] = None,
324 url: Optional[str] = None,
325 email: Optional[str] = None,
326 country: Optional[str] = None,
327 locality: Optional[str] = None,
328 ) -> None:
329 vcard = VCard4()
330 vcard.add_impp(f"xmpp:{self.jid.bare}")
332 if n := self.name:
333 vcard.add_nickname(n)
334 if full_name:
335 vcard["full_name"] = full_name
336 elif n:
337 vcard["full_name"] = n
339 if given:
340 vcard["given"] = given
341 if surname:
342 vcard["surname"] = surname
343 if birthday:
344 vcard["birthday"] = birthday
346 if note:
347 vcard.add_note(note)
348 if url:
349 vcard.add_url(url)
350 if email:
351 vcard.add_email(email)
352 if phone:
353 vcard.add_tel(phone)
354 for p in phones:
355 vcard.add_tel(p)
356 if country and locality:
357 vcard.add_address(country, locality)
358 elif country:
359 vcard.add_address(country, locality)
361 self.stored.vcard = str(vcard)
362 self.stored.vcard_fetched = True
363 self.session.create_task(
364 self.xmpp.pubsub.broadcast_vcard_event(self.jid, self.user_jid, vcard)
365 )
367 self.commit()
369 def get_roster_item(self):
370 item = {
371 "subscription": self.__get_subscription_string(),
372 "groups": [self.xmpp.ROSTER_GROUP],
373 }
374 if (n := self.name) is not None:
375 item["name"] = n
376 return {self.jid.bare: item}
378 async def add_to_roster(self, force: bool = False) -> None:
379 """
380 Add this contact to the user roster using :xep:`0356`
382 :param force: add even if the contact was already added successfully
383 """
384 if self.added_to_roster and not force:
385 return
386 if not self.session.user.preferences.get("roster_push", True):
387 log.debug("Roster push request by plugin ignored (--no-roster-push)")
388 return
389 try:
390 await self.xmpp["xep_0356"].set_roster(
391 jid=self.user_jid, roster_items=self.get_roster_item()
392 )
393 except PermissionError:
394 from slidge import __version__
396 warnings.warn(
397 "Slidge does not have the privilege to manage rosters. See "
398 f"https://slidge.im/docs/slidge/{__version__}/admin/privilege.html"
399 )
400 self.send_friend_request(
401 f"I'm already your friend on {self.xmpp.COMPONENT_TYPE}, but "
402 "slidge is not allowed to manage your roster."
403 )
404 return
405 except (IqError, IqTimeout) as e:
406 self.log.warning("Could not add to roster", exc_info=e)
407 else:
408 # we only broadcast pubsub events for contacts added to the roster
409 # so if something was set before, we need to push it now
410 self.added_to_roster = True
411 self.send_last_presence()
413 async def __broadcast_pubsub_items(self) -> None:
414 if not self.is_friend:
415 return
416 if not self.added_to_roster:
417 return
418 cached_avatar = self.get_cached_avatar()
419 if cached_avatar is not None:
420 await self.xmpp.pubsub.broadcast_avatar(
421 self.jid.bare, self.session.user_jid, cached_avatar
422 )
423 nick = self.name
425 if nick is not None:
426 self.xmpp.pubsub.broadcast_nick(
427 self.session.user_jid,
428 self.jid.bare,
429 nick,
430 )
432 def send_friend_request(self, text: Optional[str] = None) -> None:
433 presence = self._make_presence(ptype="subscribe", pstatus=text, bare=True)
434 self._send(presence, nick=True)
436 async def accept_friend_request(self, text: Optional[str] = None) -> None:
437 """
438 Call this to signify that this Contact has accepted to be a friend
439 of the user.
441 :param text: Optional message from the friend to the user
442 """
443 self.is_friend = True
444 self.added_to_roster = True
445 self.log.debug("Accepting friend request")
446 presence = self._make_presence(ptype="subscribed", pstatus=text, bare=True)
447 self._send(presence, nick=True)
448 self.send_last_presence()
449 await self.__broadcast_pubsub_items()
450 self.log.debug("Accepted friend request")
452 def reject_friend_request(self, text: Optional[str] = None) -> None:
453 """
454 Call this to signify that this Contact has refused to be a contact
455 of the user (or that they don't want to be friends anymore)
457 :param text: Optional message from the non-friend to the user
458 """
459 presence = self._make_presence(ptype="unsubscribed", pstatus=text, bare=True)
460 self.offline()
461 self._send(presence, nick=True)
462 self.is_friend = False
464 async def on_friend_request(self, text: str = "") -> None:
465 """
466 Called when receiving a "subscribe" presence, ie, "I would like to add
467 you to my contacts/friends", from the user to this contact.
469 In XMPP terms: "I would like to receive your presence updates"
471 This is only called if self.is_friend = False. If self.is_friend = True,
472 slidge will automatically "accept the friend request", ie, reply with
473 a "subscribed" presence.
475 When called, a 'friend request event' should be sent to the legacy
476 service, and when the contact responds, you should either call
477 self.accept_subscription() or self.reject_subscription()
478 """
479 pass
481 async def on_friend_delete(self, text: str = "") -> None:
482 """
483 Called when receiving an "unsubscribed" presence, ie, "I would like to
484 remove you to my contacts/friends" or "I refuse your friend request"
485 from the user to this contact.
487 In XMPP terms: "You won't receive my presence updates anymore (or you
488 never have)".
489 """
490 pass
492 async def on_friend_accept(self) -> None:
493 """
494 Called when receiving a "subscribed" presence, ie, "I accept to be
495 your/confirm that you are my friend" from the user to this contact.
497 In XMPP terms: "You will receive my presence updates".
498 """
499 pass
501 def unsubscribe(self) -> None:
502 """
503 (internal use by slidge)
505 Send an "unsubscribe", "unsubscribed", "unavailable" presence sequence
506 from this contact to the user, ie, "this contact has removed you from
507 their 'friends'".
508 """
509 for ptype in "unsubscribe", "unsubscribed", "unavailable":
510 self.xmpp.send_presence(pfrom=self.jid, pto=self.user_jid.bare, ptype=ptype)
512 async def update_info(self) -> None:
513 """
514 Fetch information about this contact from the legacy network
516 This is awaited on Contact instantiation, and should be overridden to
517 update the nickname, avatar, vcard [...] of this contact, by making
518 "legacy API calls".
520 To take advantage of the slidge avatar cache, you can check the .avatar
521 property to retrieve the "legacy file ID" of the cached avatar. If there
522 is no change, you should not call
523 :py:meth:`slidge.core.mixins.avatar.AvatarMixin.set_avatar` or attempt
524 to modify the ``.avatar`` property.
525 """
526 pass
528 async def fetch_vcard(self) -> None:
529 """
530 It the legacy network doesn't like that you fetch too many profiles on startup,
531 it's also possible to fetch it here, which will be called when XMPP clients
532 of the user request the vcard, if it hasn't been fetched before
533 :return:
534 """
535 pass
537 def _make_presence(
538 self,
539 *,
540 last_seen: Optional[datetime.datetime] = None,
541 status_codes: Optional[set[int]] = None,
542 user_full_jid: Optional[JID] = None,
543 **presence_kwargs,
544 ):
545 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
546 caps = self.xmpp.plugin["xep_0115"]
547 if p.get_from().resource and self.stored.caps_ver:
548 p["caps"]["node"] = caps.caps_node
549 p["caps"]["hash"] = caps.hash
550 p["caps"]["ver"] = self.stored.caps_ver
551 return p
554def is_markable(stanza: Union[Message, Presence]):
555 if isinstance(stanza, Presence):
556 return False
557 return bool(stanza["body"])
560log = logging.getLogger(__name__)