Coverage for slidge / core / dispatcher / pubsub.py: 95%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1from slixmpp import CoroutineCallback, Iq, StanzaPath 

2from slixmpp.exceptions import XMPPError 

3from slixmpp.plugins.xep_0060.stanza import Affiliation, Item, Subscription 

4from slixmpp.plugins.xep_0084.stanza import Data as AvatarData 

5from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata 

6 

7from slidge.db.models import Space 

8 

9from ...util.types import AnyGateway, AnySession 

10from .util import DispatcherMixin, exceptions_to_xmpp_errors 

11 

12 

13class PubSubMixin(DispatcherMixin): 

14 __slots__: list[str] = [] 

15 

16 def __init__(self, xmpp: AnyGateway) -> None: 

17 super().__init__(xmpp) 

18 

19 for path, func in [ 

20 ("get/pubsub/items", self.__get_items), 

21 ("set/pubsub/subscribe", self.__set_subscribe), 

22 ("set/pubsub/unsubscribe", self.__set_unsubscribe), 

23 ("get/pubsub/affiliations", self.__get_affiliations), 

24 ("get/pubsub_owner/affiliations", self.__owner_get_affiliations), 

25 ("get/pubsub/subscriptions", self.__get_subscriptions), 

26 ("get/pubsub_owner/subscriptions", self.__owner_get_subscriptions), 

27 ]: 

28 self.xmpp.register_handler( 

29 CoroutineCallback( 

30 func.__name__, 

31 StanzaPath(f"iq@to={self.xmpp.boundjid.bare}@type={path}"), 

32 func, # type:ignore[arg-type] # ty:ignore[invalid-argument-type] 

33 ) 

34 ) 

35 

36 async def __get_items(self, iq: Iq) -> None: 

37 node = iq["pubsub"]["items"]["node"] 

38 

39 if node in (AvatarData.namespace, AvatarMetadata.namespace, "urn:xmpp:vcard4"): 

40 # handled in slidge/core/pubsub.py 

41 # TODO: have a single entrypoint for all get/pubsub/items 

42 return 

43 

44 session = await self._get_session(iq, logged=True) 

45 legacy_id = await self.__get_legacy_id(session, node) 

46 item_ids = [item["id"] for item in iq["pubsub"]["items"]["items"]] 

47 

48 # TODO: support for avatar and banner items here 

49 if not all( 

50 item_id.endswith(f"@{self.xmpp.boundjid.bare}") for item_id in item_ids 

51 ): 

52 raise XMPPError( 

53 "item-not-found", 

54 "Slidge only supports items that are MUCs.", 

55 ) 

56 

57 if item_ids: 

58 room_legacy_ids = [ 

59 str( 

60 await session.bookmarks.jid_local_part_to_legacy_id(x.split("@")[0]) 

61 ) 

62 for x in item_ids 

63 ] 

64 else: 

65 room_legacy_ids = [] 

66 

67 with self.xmpp.store.session() as orm: 

68 rooms = self.xmpp.store.spaces.get_rooms( 

69 orm, session.user_pk, str(legacy_id), room_legacy_ids 

70 ) 

71 

72 if item_ids and len(rooms) != len(item_ids): 

73 raise XMPPError( 

74 "item-not-found", 

75 f"Could not find items: {set(item_ids) - set(str(r.jid) for r in rooms)}", 

76 ) 

77 

78 reply = iq.reply() 

79 reply["pubsub"]["items"]["node"] = node 

80 for room in rooms: 

81 item = Item() 

82 item["id"] = room.jid 

83 item["conference"]["name"] = room.name 

84 reply["pubsub"]["items"].append(item) 

85 

86 reply.send() 

87 

88 async def __get_legacy_id(self, session: AnySession, node: str) -> object: 

89 try: 

90 legacy_id = await session.bookmarks.space_node_to_legacy_id(node) 

91 except Exception as e: 

92 raise XMPPError("item-not-found", str(e)) 

93 with self.xmpp.store.session() as orm: 

94 if not self.xmpp.store.spaces.exists(orm, session.user_pk, str(legacy_id)): 

95 raise XMPPError( 

96 "item-not-found", f"This is not a known space: '{legacy_id}'" 

97 ) 

98 return legacy_id 

99 

100 async def __set_subscribe(self, iq: Iq) -> None: 

101 node = iq["pubsub"]["subscribe"]["node"] 

102 

103 session = await self._get_session(iq, logged=True) 

104 await self.__get_legacy_id(session, node) 

105 

106 reply = iq.reply(clear=True) 

107 sub = reply["pubsub"]["subscription"] 

108 sub["node"] = node 

109 sub["jid"] = session.user_jid 

110 sub["subscription"] = "subscribed" 

111 reply.send() 

112 

113 @exceptions_to_xmpp_errors 

114 async def __set_unsubscribe(self, iq: Iq) -> None: 

115 session = await self._get_session(iq, logged=True) 

116 

117 jid = iq["pubsub"]["unsubscribe"]["jid"] 

118 if jid != session.user_jid.bare: 

119 raise XMPPError("bad-request", f"Cannot unsubscribe JID {jid}") 

120 

121 node = iq["pubsub"]["unsubscribe"]["node"] 

122 legacy_id = await session.bookmarks.space_node_to_legacy_id(node) 

123 

124 await session.on_leave_space(legacy_id) 

125 reply = iq.reply(clear=True) 

126 

127 sub = reply["pubsub"]["subscription"] 

128 sub["node"] = node 

129 sub["jid"] = session.user_jid 

130 sub["subscription"] = "none" 

131 reply.send() 

132 

133 async def __get_affiliations(self, iq: Iq) -> None: 

134 session = await self._get_session(iq, logged=True) 

135 

136 reply = iq.reply() 

137 node = iq["pubsub"]["affiliations"]["node"] 

138 if node: 

139 legacy_id = await self.__get_legacy_id(session, node) 

140 with self.xmpp.store.session() as orm: 

141 if not self.xmpp.store.spaces.exists( 

142 orm, session.user_pk, str(legacy_id) 

143 ): 

144 raise XMPPError("item-not-found", f"Space '{legacy_id}' not found") 

145 affiliation = reply["pubsub"]["affiliations"]["affiliation"] 

146 affiliation["node"] = node 

147 affiliation["affiliation"] = "subscriber" 

148 else: 

149 with self.xmpp.store.session() as orm: 

150 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk)) 

151 for space in spaces: 

152 reply["pubsub"]["affiliations"].append( 

153 await self.__make_affiliation(session, space) 

154 ) 

155 

156 reply.send() 

157 

158 @staticmethod 

159 async def __make_affiliation(session: AnySession, space: Space) -> Affiliation: 

160 affiliation = Affiliation() 

161 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id) 

162 affiliation["node"] = node 

163 affiliation["affiliation"] = "subscriber" 

164 return affiliation 

165 

166 async def __owner_get_affiliations(self, iq: Iq) -> None: 

167 raise XMPPError( 

168 "forbidden", "Slidge does not implement managing space affiliations." 

169 ) 

170 

171 async def __owner_get_subscriptions(self, iq: Iq) -> None: 

172 node = iq["pubsub_owner"]["subscriptions"]["node"] 

173 if not node: 

174 raise XMPPError("bad-request", "No node was specified") 

175 

176 session = await self._get_session(iq, logged=True) 

177 await self.__get_legacy_id(session, node) 

178 

179 reply = iq.reply(clear=False) 

180 sub = reply["pubsub_owner"]["subscriptions"]["subscription"] 

181 sub["jid"] = session.user_jid 

182 sub["subscription"] = "subscribed" 

183 reply.send() 

184 

185 async def __get_subscriptions(self, iq: Iq) -> None: 

186 _node = iq["pubsub"]["subscriptions"]["node"] 

187 

188 session = await self._get_session(iq, logged=True) 

189 

190 reply = iq.reply(clear=False) 

191 subscriptions = reply["pubsub"]["subscriptions"] 

192 if node := iq["pubsub"]["subscriptions"]["node"]: 

193 await self.__get_legacy_id(session, node) 

194 subscription = subscriptions["subscription"] 

195 subscription["node"] = node 

196 subscription["jid"] = session.user_jid 

197 subscription["subscription"] = "subscribed" 

198 else: 

199 with self.xmpp.store.session() as orm: 

200 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk)) 

201 for space in spaces: 

202 subscriptions.append(await self.__make_subscription(session, space)) 

203 

204 reply.send() 

205 

206 @staticmethod 

207 async def __make_subscription(session: AnySession, space: Space) -> Subscription: 

208 subscription = Subscription() 

209 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id) 

210 subscription["node"] = node 

211 subscription["jid"] = session.user_jid 

212 subscription["subscription"] = "subscribed" 

213 return subscription