Coverage for slidge / core / dispatcher / pubsub.py: 95%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1from slixmpp import CoroutineCallback, Iq, StanzaPath
2from slixmpp.exceptions import XMPPError
3from slixmpp.plugins.xep_0060.stanza import Affiliation, Item, Subscription
4from slixmpp.plugins.xep_0084.stanza import Data as AvatarData
5from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata
7from slidge.db.models import Space
9from ...util.types import AnyGateway, AnySession
10from .util import DispatcherMixin, exceptions_to_xmpp_errors
13class PubSubMixin(DispatcherMixin):
14 __slots__: list[str] = []
16 def __init__(self, xmpp: AnyGateway) -> None:
17 super().__init__(xmpp)
19 for path, func in [
20 ("get/pubsub/items", self.__get_items),
21 ("set/pubsub/subscribe", self.__set_subscribe),
22 ("set/pubsub/unsubscribe", self.__set_unsubscribe),
23 ("get/pubsub/affiliations", self.__get_affiliations),
24 ("get/pubsub_owner/affiliations", self.__owner_get_affiliations),
25 ("get/pubsub/subscriptions", self.__get_subscriptions),
26 ("get/pubsub_owner/subscriptions", self.__owner_get_subscriptions),
27 ]:
28 self.xmpp.register_handler(
29 CoroutineCallback(
30 func.__name__,
31 StanzaPath(f"iq@to={self.xmpp.boundjid.bare}@type={path}"),
32 func, # type:ignore[arg-type] # ty:ignore[invalid-argument-type]
33 )
34 )
36 async def __get_items(self, iq: Iq) -> None:
37 node = iq["pubsub"]["items"]["node"]
39 if node in (AvatarData.namespace, AvatarMetadata.namespace, "urn:xmpp:vcard4"):
40 # handled in slidge/core/pubsub.py
41 # TODO: have a single entrypoint for all get/pubsub/items
42 return
44 session = await self._get_session(iq, logged=True)
45 legacy_id = await self.__get_legacy_id(session, node)
46 item_ids = [item["id"] for item in iq["pubsub"]["items"]["items"]]
48 # TODO: support for avatar and banner items here
49 if not all(
50 item_id.endswith(f"@{self.xmpp.boundjid.bare}") for item_id in item_ids
51 ):
52 raise XMPPError(
53 "item-not-found",
54 "Slidge only supports items that are MUCs.",
55 )
57 if item_ids:
58 room_legacy_ids = [
59 str(
60 await session.bookmarks.jid_local_part_to_legacy_id(x.split("@")[0])
61 )
62 for x in item_ids
63 ]
64 else:
65 room_legacy_ids = []
67 with self.xmpp.store.session() as orm:
68 rooms = self.xmpp.store.spaces.get_rooms(
69 orm, session.user_pk, str(legacy_id), room_legacy_ids
70 )
72 if item_ids and len(rooms) != len(item_ids):
73 raise XMPPError(
74 "item-not-found",
75 f"Could not find items: {set(item_ids) - set(str(r.jid) for r in rooms)}",
76 )
78 reply = iq.reply()
79 reply["pubsub"]["items"]["node"] = node
80 for room in rooms:
81 item = Item()
82 item["id"] = room.jid
83 item["conference"]["name"] = room.name
84 reply["pubsub"]["items"].append(item)
86 reply.send()
88 async def __get_legacy_id(self, session: AnySession, node: str) -> object:
89 try:
90 legacy_id = await session.bookmarks.space_node_to_legacy_id(node)
91 except Exception as e:
92 raise XMPPError("item-not-found", str(e))
93 with self.xmpp.store.session() as orm:
94 if not self.xmpp.store.spaces.exists(orm, session.user_pk, str(legacy_id)):
95 raise XMPPError(
96 "item-not-found", f"This is not a known space: '{legacy_id}'"
97 )
98 return legacy_id
100 async def __set_subscribe(self, iq: Iq) -> None:
101 node = iq["pubsub"]["subscribe"]["node"]
103 session = await self._get_session(iq, logged=True)
104 await self.__get_legacy_id(session, node)
106 reply = iq.reply(clear=True)
107 sub = reply["pubsub"]["subscription"]
108 sub["node"] = node
109 sub["jid"] = session.user_jid
110 sub["subscription"] = "subscribed"
111 reply.send()
113 @exceptions_to_xmpp_errors
114 async def __set_unsubscribe(self, iq: Iq) -> None:
115 session = await self._get_session(iq, logged=True)
117 jid = iq["pubsub"]["unsubscribe"]["jid"]
118 if jid != session.user_jid.bare:
119 raise XMPPError("bad-request", f"Cannot unsubscribe JID {jid}")
121 node = iq["pubsub"]["unsubscribe"]["node"]
122 legacy_id = await session.bookmarks.space_node_to_legacy_id(node)
124 await session.on_leave_space(legacy_id)
125 reply = iq.reply(clear=True)
127 sub = reply["pubsub"]["subscription"]
128 sub["node"] = node
129 sub["jid"] = session.user_jid
130 sub["subscription"] = "none"
131 reply.send()
133 async def __get_affiliations(self, iq: Iq) -> None:
134 session = await self._get_session(iq, logged=True)
136 reply = iq.reply()
137 node = iq["pubsub"]["affiliations"]["node"]
138 if node:
139 legacy_id = await self.__get_legacy_id(session, node)
140 with self.xmpp.store.session() as orm:
141 if not self.xmpp.store.spaces.exists(
142 orm, session.user_pk, str(legacy_id)
143 ):
144 raise XMPPError("item-not-found", f"Space '{legacy_id}' not found")
145 affiliation = reply["pubsub"]["affiliations"]["affiliation"]
146 affiliation["node"] = node
147 affiliation["affiliation"] = "subscriber"
148 else:
149 with self.xmpp.store.session() as orm:
150 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk))
151 for space in spaces:
152 reply["pubsub"]["affiliations"].append(
153 await self.__make_affiliation(session, space)
154 )
156 reply.send()
158 @staticmethod
159 async def __make_affiliation(session: AnySession, space: Space) -> Affiliation:
160 affiliation = Affiliation()
161 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id)
162 affiliation["node"] = node
163 affiliation["affiliation"] = "subscriber"
164 return affiliation
166 async def __owner_get_affiliations(self, iq: Iq) -> None:
167 raise XMPPError(
168 "forbidden", "Slidge does not implement managing space affiliations."
169 )
171 async def __owner_get_subscriptions(self, iq: Iq) -> None:
172 node = iq["pubsub_owner"]["subscriptions"]["node"]
173 if not node:
174 raise XMPPError("bad-request", "No node was specified")
176 session = await self._get_session(iq, logged=True)
177 await self.__get_legacy_id(session, node)
179 reply = iq.reply(clear=False)
180 sub = reply["pubsub_owner"]["subscriptions"]["subscription"]
181 sub["jid"] = session.user_jid
182 sub["subscription"] = "subscribed"
183 reply.send()
185 async def __get_subscriptions(self, iq: Iq) -> None:
186 _node = iq["pubsub"]["subscriptions"]["node"]
188 session = await self._get_session(iq, logged=True)
190 reply = iq.reply(clear=False)
191 subscriptions = reply["pubsub"]["subscriptions"]
192 if node := iq["pubsub"]["subscriptions"]["node"]:
193 await self.__get_legacy_id(session, node)
194 subscription = subscriptions["subscription"]
195 subscription["node"] = node
196 subscription["jid"] = session.user_jid
197 subscription["subscription"] = "subscribed"
198 else:
199 with self.xmpp.store.session() as orm:
200 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk))
201 for space in spaces:
202 subscriptions.append(await self.__make_subscription(session, space))
204 reply.send()
206 @staticmethod
207 async def __make_subscription(session: AnySession, space: Space) -> Subscription:
208 subscription = Subscription()
209 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id)
210 subscription["node"] = node
211 subscription["jid"] = session.user_jid
212 subscription["subscription"] = "subscribed"
213 return subscription