Coverage for slidge / core / dispatcher / presence.py: 84%
135 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import contextlib
2import logging
3import typing
5from slixmpp import JID, Presence
6from slixmpp.exceptions import XMPPError
8from ...contact import LegacyContact
9from ...contact.roster import ContactIsUser
10from ...util.types import AnySession
11from ...util.util import merge_resources
12from .util import DispatcherMixin, exceptions_to_xmpp_errors
14if typing.TYPE_CHECKING:
15 from slidge.util.types import AnyGateway
18class _IsDirectedAtComponent(Exception):
19 def __init__(self, session: AnySession) -> None:
20 self.session = session
23class PresenceHandlerMixin(DispatcherMixin):
24 __slots__: list[str] = []
25 xmpp: "AnyGateway"
27 def __init__(self, xmpp: "AnyGateway") -> None:
28 super().__init__(xmpp)
30 xmpp.add_event_handler("presence_subscribe", self._handle_subscribe)
31 xmpp.add_event_handler("presence_subscribed", self._handle_subscribed)
32 xmpp.add_event_handler("presence_unsubscribe", self._handle_unsubscribe)
33 xmpp.add_event_handler("presence_unsubscribed", self._handle_unsubscribed)
34 xmpp.add_event_handler("presence_probe", self._handle_probe)
35 xmpp.add_event_handler("presence", self.on_presence)
37 async def __get_contact(self, pres: Presence) -> LegacyContact:
38 sess = await self._get_session(pres)
39 pto = pres.get_to()
40 if pto == self.xmpp.boundjid.bare:
41 raise _IsDirectedAtComponent(sess)
42 await sess.contacts.ready
43 return await sess.contacts.by_jid(pto) # type:ignore[no-any-return]
45 @exceptions_to_xmpp_errors
46 async def _handle_subscribe(self, pres: Presence) -> None:
47 try:
48 contact = await self.__get_contact(pres)
49 except _IsDirectedAtComponent:
50 pres.reply().send()
51 return
53 if contact.is_friend:
54 pres.reply().send()
55 else:
56 await contact.on_friend_request(pres["status"])
58 @exceptions_to_xmpp_errors
59 async def _handle_unsubscribe(self, pres: Presence) -> None:
60 pres.reply().send()
62 try:
63 contact = await self.__get_contact(pres)
64 except _IsDirectedAtComponent as e:
65 e.session.send_gateway_message("Bye bye!")
66 await e.session.kill_by_jid(e.session.user_jid)
67 return
69 contact.is_friend = False
70 await contact.on_friend_delete(pres["status"])
72 @exceptions_to_xmpp_errors
73 async def _handle_subscribed(self, pres: Presence) -> None:
74 try:
75 contact = await self.__get_contact(pres)
76 except _IsDirectedAtComponent:
77 return
79 await contact.on_friend_accept()
80 contact.send_last_presence(force=True)
82 @exceptions_to_xmpp_errors
83 async def _handle_unsubscribed(self, pres: Presence) -> None:
84 try:
85 contact = await self.__get_contact(pres)
86 except _IsDirectedAtComponent:
87 return
89 if contact.is_friend:
90 contact.is_friend = False
91 await contact.on_friend_delete(pres["status"])
93 @exceptions_to_xmpp_errors
94 async def _handle_probe(self, pres: Presence) -> None:
95 try:
96 contact = await self.__get_contact(pres)
97 except _IsDirectedAtComponent:
98 session = await self._get_session(pres)
99 session.send_cached_presence(pres.get_from())
100 return
101 if contact.is_friend:
102 contact.send_last_presence(force=True)
103 else:
104 reply = pres.reply()
105 reply["type"] = "unsubscribed"
106 reply.send()
108 @exceptions_to_xmpp_errors
109 async def on_presence(self, p: Presence) -> None:
110 if "muc_join" in p:
111 # handled in on_groupchat_join
112 # without this early return, since we switch from and to in this
113 # presence stanza, on_groupchat_join ends up trying to instantiate
114 # a MUC with the user's JID, which in turn leads to slidge sending
115 # a (error) presence from=the user's JID, which terminates the
116 # XML stream.
117 return
119 session = await self._get_session(p)
121 pto = p.get_to()
122 if pto == self.xmpp.boundjid.bare:
123 await self._on_presence_to_component(session, p)
124 return
126 if p.get_type() == "available":
127 try:
128 contact = await session.contacts.by_jid(pto)
129 except XMPPError:
130 contact = None
131 except ContactIsUser:
132 raise XMPPError(
133 "bad-request", "Actions with yourself are not supported."
134 )
135 if contact is not None:
136 await self.xmpp.pubsub.on_presence_available(p, contact)
137 return
139 muc = session.bookmarks.by_jid_only_if_exists(JID(pto.bare))
141 if muc is not None and p.get_type() == "unavailable":
142 muc.on_presence_unavailable(p)
143 return
145 if muc is None or p.get_from().resource not in muc.get_user_resources():
146 return
148 if pto.resource == muc.user_nick:
149 # Ignore presence stanzas with the valid nick.
150 # even if joined to the group, we might receive those from clients,
151 # when setting a status message, or going away, etc.
152 return
154 # We can't use XMPPError here because XMPPError does not have a way to
155 # add the <x xmlns="http://jabber.org/protocol/muc" /> element
157 error_stanza = p.error()
158 error_stanza.set_to(p.get_from())
159 error_stanza.set_from(pto)
160 error_stanza.enable("muc_join") # <x xmlns="http://jabber.org/protocol/muc" />
161 error_stanza.enable("error")
162 error_stanza["error"]["type"] = "cancel"
163 error_stanza["error"]["by"] = muc.jid
164 error_stanza["error"]["condition"] = "not-acceptable"
165 error_stanza["error"]["text"] = (
166 "Slidge does not let you change your nickname in groups."
167 )
168 error_stanza.send()
170 async def _on_presence_to_component(self, session: AnySession, p: Presence) -> None:
171 session.log.debug("Received a presence from %s", p.get_from())
172 if (ptype := p.get_type()) not in _USEFUL_PRESENCES:
173 return
174 if not session.user.preferences.get("sync_presence", False):
175 session.log.debug("User does not want to sync their presence")
176 return
177 # NB: get_type() returns either a proper presence type or
178 # a presence show if available. Weird, weird, weird slix.
179 resources = self.xmpp.roster[self.xmpp.boundjid.bare][p.get_from()].resources
180 with contextlib.suppress(NotImplementedError):
181 await session.on_presence(
182 p.get_from().resource,
183 ptype, # type: ignore
184 p["status"],
185 resources,
186 merge_resources(resources),
187 )
188 if p.get_type() == "available":
189 await self.xmpp.pubsub.on_presence_available(p, None)
190 for contact in session.contacts:
191 await self.xmpp.pubsub.on_presence_available(p, contact)
194_USEFUL_PRESENCES = {"available", "unavailable", "away", "chat", "dnd", "xa"}
196log = logging.getLogger(__name__)