Coverage for slidge / core / dispatcher / presence.py: 84%
134 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import contextlib
2import logging
3import typing
5from slixmpp import JID, Presence
6from slixmpp.exceptions import XMPPError
8from ...contact.roster import ContactIsUser
9from ...util.types import AnyContact, AnySession
10from ...util.util import merge_resources
11from .util import DispatcherMixin, exceptions_to_xmpp_errors
13if typing.TYPE_CHECKING:
14 from slidge.util.types import AnyGateway
17class _IsDirectedAtComponent(Exception):
18 def __init__(self, session: AnySession) -> None:
19 self.session = session
22class PresenceHandlerMixin(DispatcherMixin):
23 __slots__: list[str] = []
24 xmpp: "AnyGateway"
26 def __init__(self, xmpp: "AnyGateway") -> None:
27 super().__init__(xmpp)
29 xmpp.add_event_handler("presence_subscribe", self._handle_subscribe)
30 xmpp.add_event_handler("presence_subscribed", self._handle_subscribed)
31 xmpp.add_event_handler("presence_unsubscribe", self._handle_unsubscribe)
32 xmpp.add_event_handler("presence_unsubscribed", self._handle_unsubscribed)
33 xmpp.add_event_handler("presence_probe", self._handle_probe)
34 xmpp.add_event_handler("presence", self.on_presence)
36 async def __get_contact(self, pres: Presence) -> AnyContact:
37 sess = await self._get_session(pres)
38 pto = pres.get_to()
39 if pto == self.xmpp.boundjid.bare:
40 raise _IsDirectedAtComponent(sess)
41 await sess.contacts.ready
42 return await sess.contacts.by_jid(pto) # type:ignore[no-any-return]
44 @exceptions_to_xmpp_errors
45 async def _handle_subscribe(self, pres: Presence) -> None:
46 try:
47 contact = await self.__get_contact(pres)
48 except _IsDirectedAtComponent:
49 pres.reply().send()
50 return
52 if contact.is_friend:
53 pres.reply().send()
54 else:
55 await contact.on_friend_request(pres["status"])
57 @exceptions_to_xmpp_errors
58 async def _handle_unsubscribe(self, pres: Presence) -> None:
59 pres.reply().send()
61 try:
62 contact = await self.__get_contact(pres)
63 except _IsDirectedAtComponent as e:
64 e.session.send_gateway_message("Bye bye!")
65 await e.session.kill_by_jid(e.session.user_jid)
66 return
68 contact.is_friend = False
69 await contact.on_friend_delete(pres["status"])
71 @exceptions_to_xmpp_errors
72 async def _handle_subscribed(self, pres: Presence) -> None:
73 try:
74 contact = await self.__get_contact(pres)
75 except _IsDirectedAtComponent:
76 return
78 await contact.on_friend_accept()
79 contact.send_last_presence(force=True)
81 @exceptions_to_xmpp_errors
82 async def _handle_unsubscribed(self, pres: Presence) -> None:
83 try:
84 contact = await self.__get_contact(pres)
85 except _IsDirectedAtComponent:
86 return
88 if contact.is_friend:
89 contact.is_friend = False
90 await contact.on_friend_delete(pres["status"])
92 @exceptions_to_xmpp_errors
93 async def _handle_probe(self, pres: Presence) -> None:
94 try:
95 contact = await self.__get_contact(pres)
96 except _IsDirectedAtComponent:
97 session = await self._get_session(pres)
98 session.send_cached_presence(pres.get_from())
99 return
100 if contact.is_friend:
101 contact.send_last_presence(force=True)
102 else:
103 reply = pres.reply()
104 reply["type"] = "unsubscribed"
105 reply.send()
107 @exceptions_to_xmpp_errors
108 async def on_presence(self, p: Presence) -> None:
109 if "muc_join" in p:
110 # handled in on_groupchat_join
111 # without this early return, since we switch from and to in this
112 # presence stanza, on_groupchat_join ends up trying to instantiate
113 # a MUC with the user's JID, which in turn leads to slidge sending
114 # a (error) presence from=the user's JID, which terminates the
115 # XML stream.
116 return
118 session = await self._get_session(p)
120 pto = p.get_to()
121 if pto == self.xmpp.boundjid.bare:
122 await self._on_presence_to_component(session, p)
123 return
125 if p.get_type() == "available":
126 try:
127 contact = await session.contacts.by_jid(pto)
128 except XMPPError:
129 contact = None
130 except ContactIsUser:
131 raise XMPPError(
132 "bad-request", "Actions with yourself are not supported."
133 )
134 if contact is not None:
135 await self.xmpp.pubsub.on_presence_available(p, contact)
136 return
138 muc = session.bookmarks.by_jid_only_if_exists(JID(pto.bare))
140 if muc is not None and p.get_type() == "unavailable":
141 muc.on_presence_unavailable(p)
142 return
144 if muc is None or p.get_from().resource not in muc.get_user_resources():
145 return
147 if pto.resource == muc.user_nick:
148 # Ignore presence stanzas with the valid nick.
149 # even if joined to the group, we might receive those from clients,
150 # when setting a status message, or going away, etc.
151 return
153 # We can't use XMPPError here because XMPPError does not have a way to
154 # add the <x xmlns="http://jabber.org/protocol/muc" /> element
156 error_stanza = p.error()
157 error_stanza.set_to(p.get_from())
158 error_stanza.set_from(pto)
159 error_stanza.enable("muc_join") # <x xmlns="http://jabber.org/protocol/muc" />
160 error_stanza.enable("error")
161 error_stanza["error"]["type"] = "cancel"
162 error_stanza["error"]["by"] = muc.jid
163 error_stanza["error"]["condition"] = "not-acceptable"
164 error_stanza["error"]["text"] = (
165 "Slidge does not let you change your nickname in groups."
166 )
167 error_stanza.send()
169 async def _on_presence_to_component(self, session: AnySession, p: Presence) -> None:
170 session.log.debug("Received a presence from %s", p.get_from())
171 if (ptype := p.get_type()) not in _USEFUL_PRESENCES:
172 return
173 if not session.user.preferences.get("sync_presence", False):
174 session.log.debug("User does not want to sync their presence")
175 return
176 # NB: get_type() returns either a proper presence type or
177 # a presence show if available. Weird, weird, weird slix.
178 resources = self.xmpp.roster[self.xmpp.boundjid.bare][p.get_from()].resources
179 with contextlib.suppress(NotImplementedError):
180 await session.on_presence(
181 p.get_from().resource,
182 ptype, # type: ignore
183 p["status"],
184 resources,
185 merge_resources(resources),
186 )
187 if p.get_type() == "available":
188 await self.xmpp.pubsub.on_presence_available(p, None)
189 for contact in session.contacts:
190 await self.xmpp.pubsub.on_presence_available(p, contact)
193_USEFUL_PRESENCES = {"available", "unavailable", "away", "chat", "dnd", "xa"}
195log = logging.getLogger(__name__)