Coverage for slidge / core / dispatcher / pubsub.py: 95%
132 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1from slixmpp import CoroutineCallback, Iq, StanzaPath
2from slixmpp.exceptions import XMPPError
3from slixmpp.plugins.xep_0060.stanza import Affiliation, Item, Subscription
4from slixmpp.plugins.xep_0084.stanza import Data as AvatarData
5from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata
7from slidge.db.models import Space
9from ...util.types import AnyGateway, AnySession
10from .util import DispatcherMixin
13class PubSubMixin(DispatcherMixin):
14 __slots__: list[str] = []
16 def __init__(self, xmpp: AnyGateway) -> None:
17 super().__init__(xmpp)
19 for path, func in [
20 ("get/pubsub/items", self.__get_items),
21 ("set/pubsub/subscribe", self.__set_subscribe),
22 ("set/pubsub/unsubscribe", self.__set_unsubscribe),
23 ("get/pubsub/affiliations", self.__get_affiliations),
24 ("get/pubsub_owner/affiliations", self.__owner_get_affiliations),
25 ("get/pubsub/subscriptions", self.__get_subscriptions),
26 ("get/pubsub_owner/subscriptions", self.__owner_get_subscriptions),
27 ]:
28 self.xmpp.register_handler(
29 CoroutineCallback(
30 func.__name__,
31 StanzaPath(f"iq@to={self.xmpp.boundjid.bare}@type={path}"),
32 func, # type:ignore[arg-type] # ty:ignore[invalid-argument-type]
33 )
34 )
36 async def __get_items(self, iq: Iq) -> None:
37 node = iq["pubsub"]["items"]["node"]
39 if node in (AvatarData.namespace, AvatarMetadata.namespace, "urn:xmpp:vcard4"):
40 # handled in slidge/core/pubsub.py
41 # TODO: have a single entrypoint for all get/pubsub/items
42 return
44 session = await self._get_session(iq, logged=True)
45 legacy_id = await self.__get_legacy_id(session, node)
46 item_ids = [item["id"] for item in iq["pubsub"]["items"]["items"]]
48 # TODO: support for avatar and banner items here
49 if not all(
50 item_id.endswith(f"@{self.xmpp.boundjid.bare}") for item_id in item_ids
51 ):
52 raise XMPPError(
53 "item-not-found",
54 "Slidge only supports items that are MUCs.",
55 )
57 if item_ids:
58 room_legacy_ids = [
59 str(
60 await session.bookmarks.jid_local_part_to_legacy_id(x.split("@")[0])
61 )
62 for x in item_ids
63 ]
64 else:
65 room_legacy_ids = []
67 with self.xmpp.store.session() as orm:
68 rooms = self.xmpp.store.spaces.get_rooms(
69 orm, session.user_pk, str(legacy_id), room_legacy_ids
70 )
72 if item_ids and len(rooms) != len(item_ids):
73 raise XMPPError(
74 "item-not-found",
75 f"Could not find items: {set(item_ids) - set(str(r.jid) for r in rooms)}",
76 )
78 reply = iq.reply()
79 reply["pubsub"]["items"]["node"] = node
80 for room in rooms:
81 item = Item()
82 item["id"] = room.jid
83 item["conference"]["name"] = room.name
84 reply["pubsub"]["items"].append(item)
86 reply.send()
88 async def __get_legacy_id(self, session: AnySession, node: str) -> object:
89 try:
90 legacy_id = await session.bookmarks.space_node_to_legacy_id(node)
91 except Exception as e:
92 raise XMPPError("item-not-found", str(e))
93 with self.xmpp.store.session() as orm:
94 if not self.xmpp.store.spaces.exists(orm, session.user_pk, str(legacy_id)):
95 raise XMPPError(
96 "item-not-found", f"This is not a known space: '{legacy_id}'"
97 )
98 return legacy_id
100 async def __set_subscribe(self, iq: Iq) -> None:
101 node = iq["pubsub"]["subscribe"]["node"]
103 session = await self._get_session(iq, logged=True)
104 await self.__get_legacy_id(session, node)
106 reply = iq.reply(clear=True)
107 sub = reply["pubsub"]["subscription"]
108 sub["node"] = node
109 sub["jid"] = session.user_jid
110 sub["subscription"] = "subscribed"
111 reply.send()
113 async def __set_unsubscribe(self, iq: Iq) -> None:
114 session = await self._get_session(iq, logged=True)
116 jid = iq["pubsub"]["unsubscribe"]["jid"]
117 if jid != session.user_jid.bare:
118 raise XMPPError("bad-request", f"Cannot unsubscribe JID {jid}")
120 node = iq["pubsub"]["unsubscribe"]["node"]
121 legacy_id = await session.bookmarks.space_node_to_legacy_id(node)
123 await session.on_leave_space(legacy_id)
124 reply = iq.reply(clear=True)
126 sub = reply["pubsub"]["subscription"]
127 sub["node"] = node
128 sub["jid"] = session.user_jid
129 sub["subscription"] = "none"
130 reply.send()
132 async def __get_affiliations(self, iq: Iq) -> None:
133 session = await self._get_session(iq, logged=True)
135 reply = iq.reply()
136 node = iq["pubsub"]["affiliations"]["node"]
137 if node:
138 legacy_id = await self.__get_legacy_id(session, node)
139 with self.xmpp.store.session() as orm:
140 if not self.xmpp.store.spaces.exists(
141 orm, session.user_pk, str(legacy_id)
142 ):
143 raise XMPPError("item-not-found", f"Space '{legacy_id}' not found")
144 affiliation = reply["pubsub"]["affiliations"]["affiliation"]
145 affiliation["node"] = node
146 affiliation["affiliation"] = "subscriber"
147 else:
148 with self.xmpp.store.session() as orm:
149 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk))
150 for space in spaces:
151 reply["pubsub"]["affiliations"].append(
152 await self.__make_affiliation(session, space)
153 )
155 reply.send()
157 @staticmethod
158 async def __make_affiliation(session: AnySession, space: Space) -> Affiliation:
159 affiliation = Affiliation()
160 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id)
161 affiliation["node"] = node
162 affiliation["affiliation"] = "subscriber"
163 return affiliation
165 async def __owner_get_affiliations(self, iq: Iq) -> None:
166 raise XMPPError(
167 "forbidden", "Slidge does not implement managing space affiliations."
168 )
170 async def __owner_get_subscriptions(self, iq: Iq) -> None:
171 node = iq["pubsub_owner"]["subscriptions"]["node"]
172 if not node:
173 raise XMPPError("bad-request", "No node was specified")
175 session = await self._get_session(iq, logged=True)
176 await self.__get_legacy_id(session, node)
178 reply = iq.reply(clear=False)
179 sub = reply["pubsub_owner"]["subscriptions"]["subscription"]
180 sub["jid"] = session.user_jid
181 sub["subscription"] = "subscribed"
182 reply.send()
184 async def __get_subscriptions(self, iq: Iq) -> None:
185 _node = iq["pubsub"]["subscriptions"]["node"]
187 session = await self._get_session(iq, logged=True)
189 reply = iq.reply(clear=False)
190 subscriptions = reply["pubsub"]["subscriptions"]
191 if node := iq["pubsub"]["subscriptions"]["node"]:
192 await self.__get_legacy_id(session, node)
193 subscription = subscriptions["subscription"]
194 subscription["node"] = node
195 subscription["jid"] = session.user_jid
196 subscription["subscription"] = "subscribed"
197 else:
198 with self.xmpp.store.session() as orm:
199 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk))
200 for space in spaces:
201 subscriptions.append(await self.__make_subscription(session, space))
203 reply.send()
205 @staticmethod
206 async def __make_subscription(session: AnySession, space: Space) -> Subscription:
207 subscription = Subscription()
208 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id)
209 subscription["node"] = node
210 subscription["jid"] = session.user_jid
211 subscription["subscription"] = "subscribed"
212 return subscription