Coverage for slidge/core/dispatcher/presence.py: 82%
125 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import logging
3from slixmpp import JID, Presence
4from slixmpp.exceptions import XMPPError
6from ...util.util import merge_resources
7from ..session import BaseSession
8from .util import DispatcherMixin, exceptions_to_xmpp_errors
11class _IsDirectedAtComponent(Exception):
12 def __init__(self, session: BaseSession) -> None:
13 self.session = session
16class PresenceHandlerMixin(DispatcherMixin):
17 __slots__: list[str] = []
19 def __init__(self, xmpp) -> None:
20 super().__init__(xmpp)
22 xmpp.add_event_handler("presence_subscribe", self._handle_subscribe)
23 xmpp.add_event_handler("presence_subscribed", self._handle_subscribed)
24 xmpp.add_event_handler("presence_unsubscribe", self._handle_unsubscribe)
25 xmpp.add_event_handler("presence_unsubscribed", self._handle_unsubscribed)
26 xmpp.add_event_handler("presence_probe", self._handle_probe)
27 xmpp.add_event_handler("presence", self.on_presence)
29 async def __get_contact(self, pres: Presence):
30 sess = await self._get_session(pres)
31 pto = pres.get_to()
32 if pto == self.xmpp.boundjid.bare:
33 raise _IsDirectedAtComponent(sess)
34 await sess.contacts.ready
35 return await sess.contacts.by_jid(pto)
37 @exceptions_to_xmpp_errors
38 async def _handle_subscribe(self, pres: Presence) -> None:
39 try:
40 contact = await self.__get_contact(pres)
41 except _IsDirectedAtComponent:
42 pres.reply().send()
43 return
45 if contact.is_friend:
46 pres.reply().send()
47 else:
48 await contact.on_friend_request(pres["status"])
50 @exceptions_to_xmpp_errors
51 async def _handle_unsubscribe(self, pres: Presence) -> None:
52 pres.reply().send()
54 try:
55 contact = await self.__get_contact(pres)
56 except _IsDirectedAtComponent as e:
57 e.session.send_gateway_message("Bye bye!")
58 await e.session.kill_by_jid(e.session.user_jid)
59 return
61 contact.is_friend = False
62 await contact.on_friend_delete(pres["status"])
64 @exceptions_to_xmpp_errors
65 async def _handle_subscribed(self, pres: Presence) -> None:
66 try:
67 contact = await self.__get_contact(pres)
68 except _IsDirectedAtComponent:
69 return
71 await contact.on_friend_accept()
73 @exceptions_to_xmpp_errors
74 async def _handle_unsubscribed(self, pres: Presence) -> None:
75 try:
76 contact = await self.__get_contact(pres)
77 except _IsDirectedAtComponent:
78 return
80 if contact.is_friend:
81 contact.is_friend = False
82 await contact.on_friend_delete(pres["status"])
84 @exceptions_to_xmpp_errors
85 async def _handle_probe(self, pres: Presence) -> None:
86 try:
87 contact = await self.__get_contact(pres)
88 except _IsDirectedAtComponent:
89 session = await self._get_session(pres)
90 session.send_cached_presence(pres.get_from())
91 return
92 if contact.is_friend:
93 contact.send_last_presence(force=True)
94 else:
95 reply = pres.reply()
96 reply["type"] = "unsubscribed"
97 reply.send()
99 @exceptions_to_xmpp_errors
100 async def on_presence(self, p: Presence):
101 if p.get_plugin("muc_join", check=True):
102 # handled in on_groupchat_join
103 # without this early return, since we switch from and to in this
104 # presence stanza, on_groupchat_join ends up trying to instantiate
105 # a MUC with the user's JID, which in turn leads to slidge sending
106 # a (error) presence from=the user's JID, which terminates the
107 # XML stream.
108 return
110 session = await self._get_session(p)
112 pto = p.get_to()
113 if pto == self.xmpp.boundjid.bare:
114 session.log.debug("Received a presence from %s", p.get_from())
115 if (ptype := p.get_type()) not in _USEFUL_PRESENCES:
116 return
117 if not session.user.preferences.get("sync_presence", False):
118 session.log.debug("User does not want to sync their presence")
119 return
120 # NB: get_type() returns either a proper presence type or
121 # a presence show if available. Weird, weird, weird slix.
122 resources = self.xmpp.roster[self.xmpp.boundjid.bare][
123 p.get_from()
124 ].resources
125 try:
126 await session.on_presence(
127 p.get_from().resource,
128 ptype, # type: ignore
129 p["status"],
130 resources,
131 merge_resources(resources),
132 )
133 except NotImplementedError:
134 pass
135 if p.get_type() == "available":
136 await self.xmpp.pubsub.on_presence_available(p, None)
137 for contact in session.contacts:
138 await self.xmpp.pubsub.on_presence_available(p, contact)
140 if p.get_type() == "available":
141 try:
142 contact = await session.contacts.by_jid(pto)
143 except XMPPError:
144 contact = None
145 if contact is not None:
146 await self.xmpp.pubsub.on_presence_available(p, contact)
147 return
149 muc = session.bookmarks.by_jid_only_if_exists(JID(pto.bare))
151 if muc is not None and p.get_type() == "unavailable":
152 return muc.on_presence_unavailable(p)
154 if muc is None or p.get_from().resource not in muc.get_user_resources():
155 return
157 if pto.resource == muc.user_nick:
158 # Ignore presence stanzas with the valid nick.
159 # even if joined to the group, we might receive those from clients,
160 # when setting a status message, or going away, etc.
161 return
163 # We can't use XMPPError here because XMPPError does not have a way to
164 # add the <x xmlns="http://jabber.org/protocol/muc" /> element
166 error_stanza = p.error()
167 error_stanza.set_to(p.get_from())
168 error_stanza.set_from(pto)
169 error_stanza.enable("muc_join") # <x xmlns="http://jabber.org/protocol/muc" />
170 error_stanza.enable("error")
171 error_stanza["error"]["type"] = "cancel"
172 error_stanza["error"]["by"] = muc.jid
173 error_stanza["error"]["condition"] = "not-acceptable"
174 error_stanza["error"]["text"] = (
175 "Slidge does not let you change your nickname in groups."
176 )
177 error_stanza.send()
180_USEFUL_PRESENCES = {"available", "unavailable", "away", "chat", "dnd", "xa"}
182log = logging.getLogger(__name__)