Coverage for slidge / core / dispatcher / presence.py: 82%
135 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import logging
2import typing
4from slixmpp import JID, Presence
5from slixmpp.exceptions import XMPPError
7from ...contact.roster import ContactIsUser
8from ...util.types import AnyContact, AnySession
9from ...util.util import merge_resources
10from .util import DispatcherMixin, exceptions_to_xmpp_errors
12if typing.TYPE_CHECKING:
13 from slidge.core.gateway import BaseGateway
16class _IsDirectedAtComponent(Exception):
17 def __init__(self, session: AnySession) -> None:
18 self.session = session
21class PresenceHandlerMixin(DispatcherMixin):
22 __slots__: list[str] = []
23 xmpp: "BaseGateway"
25 def __init__(self, xmpp: "BaseGateway") -> None:
26 super().__init__(xmpp)
28 xmpp.add_event_handler("presence_subscribe", self._handle_subscribe)
29 xmpp.add_event_handler("presence_subscribed", self._handle_subscribed)
30 xmpp.add_event_handler("presence_unsubscribe", self._handle_unsubscribe)
31 xmpp.add_event_handler("presence_unsubscribed", self._handle_unsubscribed)
32 xmpp.add_event_handler("presence_probe", self._handle_probe)
33 xmpp.add_event_handler("presence", self.on_presence)
35 async def __get_contact(self, pres: Presence) -> AnyContact:
36 sess = await self._get_session(pres)
37 pto = pres.get_to()
38 if pto == self.xmpp.boundjid.bare:
39 raise _IsDirectedAtComponent(sess)
40 await sess.contacts.ready
41 return await sess.contacts.by_jid(pto) # type:ignore[no-any-return]
43 @exceptions_to_xmpp_errors
44 async def _handle_subscribe(self, pres: Presence) -> None:
45 try:
46 contact = await self.__get_contact(pres)
47 except _IsDirectedAtComponent:
48 pres.reply().send()
49 return
51 if contact.is_friend:
52 pres.reply().send()
53 else:
54 await contact.on_friend_request(pres["status"])
56 @exceptions_to_xmpp_errors
57 async def _handle_unsubscribe(self, pres: Presence) -> None:
58 pres.reply().send()
60 try:
61 contact = await self.__get_contact(pres)
62 except _IsDirectedAtComponent as e:
63 e.session.send_gateway_message("Bye bye!")
64 await e.session.kill_by_jid(e.session.user_jid)
65 return
67 contact.is_friend = False
68 await contact.on_friend_delete(pres["status"])
70 @exceptions_to_xmpp_errors
71 async def _handle_subscribed(self, pres: Presence) -> None:
72 try:
73 contact = await self.__get_contact(pres)
74 except _IsDirectedAtComponent:
75 return
77 await contact.on_friend_accept()
78 contact.send_last_presence(force=True)
80 @exceptions_to_xmpp_errors
81 async def _handle_unsubscribed(self, pres: Presence) -> None:
82 try:
83 contact = await self.__get_contact(pres)
84 except _IsDirectedAtComponent:
85 return
87 if contact.is_friend:
88 contact.is_friend = False
89 await contact.on_friend_delete(pres["status"])
91 @exceptions_to_xmpp_errors
92 async def _handle_probe(self, pres: Presence) -> None:
93 try:
94 contact = await self.__get_contact(pres)
95 except _IsDirectedAtComponent:
96 session = await self._get_session(pres)
97 session.send_cached_presence(pres.get_from())
98 return
99 if contact.is_friend:
100 contact.send_last_presence(force=True)
101 else:
102 reply = pres.reply()
103 reply["type"] = "unsubscribed"
104 reply.send()
106 @exceptions_to_xmpp_errors
107 async def on_presence(self, p: Presence) -> None:
108 if p.get_plugin("muc_join", check=True):
109 # handled in on_groupchat_join
110 # without this early return, since we switch from and to in this
111 # presence stanza, on_groupchat_join ends up trying to instantiate
112 # a MUC with the user's JID, which in turn leads to slidge sending
113 # a (error) presence from=the user's JID, which terminates the
114 # XML stream.
115 return
117 session = await self._get_session(p)
119 pto = p.get_to()
120 if pto == self.xmpp.boundjid.bare:
121 await self._on_presence_to_component(session, p)
122 return
124 if p.get_type() == "available":
125 try:
126 contact = await session.contacts.by_jid(pto)
127 except XMPPError:
128 contact = None
129 except ContactIsUser:
130 raise XMPPError(
131 "bad-request", "Actions with yourself are not supported."
132 )
133 if contact is not None:
134 await self.xmpp.pubsub.on_presence_available(p, contact)
135 return
137 muc = session.bookmarks.by_jid_only_if_exists(JID(pto.bare))
139 if muc is not None and p.get_type() == "unavailable":
140 muc.on_presence_unavailable(p)
141 return
143 if muc is None or p.get_from().resource not in muc.get_user_resources():
144 return
146 if pto.resource == muc.user_nick:
147 # Ignore presence stanzas with the valid nick.
148 # even if joined to the group, we might receive those from clients,
149 # when setting a status message, or going away, etc.
150 return
152 # We can't use XMPPError here because XMPPError does not have a way to
153 # add the <x xmlns="http://jabber.org/protocol/muc" /> element
155 error_stanza = p.error()
156 error_stanza.set_to(p.get_from())
157 error_stanza.set_from(pto)
158 error_stanza.enable("muc_join") # <x xmlns="http://jabber.org/protocol/muc" />
159 error_stanza.enable("error")
160 error_stanza["error"]["type"] = "cancel"
161 error_stanza["error"]["by"] = muc.jid
162 error_stanza["error"]["condition"] = "not-acceptable"
163 error_stanza["error"]["text"] = (
164 "Slidge does not let you change your nickname in groups."
165 )
166 error_stanza.send()
168 async def _on_presence_to_component(self, session: AnySession, p: Presence) -> None:
169 session.log.debug("Received a presence from %s", p.get_from())
170 if (ptype := p.get_type()) not in _USEFUL_PRESENCES:
171 return
172 if not session.user.preferences.get("sync_presence", False):
173 session.log.debug("User does not want to sync their presence")
174 return
175 # NB: get_type() returns either a proper presence type or
176 # a presence show if available. Weird, weird, weird slix.
177 resources = self.xmpp.roster[self.xmpp.boundjid.bare][p.get_from()].resources
178 try:
179 await session.on_presence(
180 p.get_from().resource,
181 ptype, # type: ignore
182 p["status"],
183 resources,
184 merge_resources(resources),
185 )
186 except NotImplementedError:
187 pass
188 if p.get_type() == "available":
189 await self.xmpp.pubsub.on_presence_available(p, None)
190 for contact in session.contacts:
191 await self.xmpp.pubsub.on_presence_available(p, contact)
194_USEFUL_PRESENCES = {"available", "unavailable", "away", "chat", "dnd", "xa"}
196log = logging.getLogger(__name__)