Coverage for slidge / core / dispatcher / pubsub.py: 95%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1from slixmpp import CoroutineCallback, Iq, StanzaPath 

2from slixmpp.exceptions import XMPPError 

3from slixmpp.plugins.xep_0060.stanza import Affiliation, Item, Subscription 

4from slixmpp.plugins.xep_0084.stanza import Data as AvatarData 

5from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata 

6 

7from slidge.db.models import Space 

8 

9from ...util.types import AnyGateway, AnySession 

10from .util import DispatcherMixin 

11 

12 

13class PubSubMixin(DispatcherMixin): 

14 __slots__: list[str] = [] 

15 

16 def __init__(self, xmpp: AnyGateway) -> None: 

17 super().__init__(xmpp) 

18 

19 for path, func in [ 

20 ("get/pubsub/items", self.__get_items), 

21 ("set/pubsub/subscribe", self.__set_subscribe), 

22 ("set/pubsub/unsubscribe", self.__set_unsubscribe), 

23 ("get/pubsub/affiliations", self.__get_affiliations), 

24 ("get/pubsub_owner/affiliations", self.__owner_get_affiliations), 

25 ("get/pubsub/subscriptions", self.__get_subscriptions), 

26 ("get/pubsub_owner/subscriptions", self.__owner_get_subscriptions), 

27 ]: 

28 self.xmpp.register_handler( 

29 CoroutineCallback( 

30 func.__name__, 

31 StanzaPath(f"iq@to={self.xmpp.boundjid.bare}@type={path}"), 

32 func, # type:ignore[arg-type] # ty:ignore[invalid-argument-type] 

33 ) 

34 ) 

35 

36 async def __get_items(self, iq: Iq) -> None: 

37 node = iq["pubsub"]["items"]["node"] 

38 

39 if node in (AvatarData.namespace, AvatarMetadata.namespace, "urn:xmpp:vcard4"): 

40 # handled in slidge/core/pubsub.py 

41 # TODO: have a single entrypoint for all get/pubsub/items 

42 return 

43 

44 session = await self._get_session(iq, logged=True) 

45 legacy_id = await self.__get_legacy_id(session, node) 

46 item_ids = [item["id"] for item in iq["pubsub"]["items"]["items"]] 

47 

48 # TODO: support for avatar and banner items here 

49 if not all( 

50 item_id.endswith(f"@{self.xmpp.boundjid.bare}") for item_id in item_ids 

51 ): 

52 raise XMPPError( 

53 "item-not-found", 

54 "Slidge only supports items that are MUCs.", 

55 ) 

56 

57 if item_ids: 

58 room_legacy_ids = [ 

59 str( 

60 await session.bookmarks.jid_local_part_to_legacy_id(x.split("@")[0]) 

61 ) 

62 for x in item_ids 

63 ] 

64 else: 

65 room_legacy_ids = [] 

66 

67 with self.xmpp.store.session() as orm: 

68 rooms = self.xmpp.store.spaces.get_rooms( 

69 orm, session.user_pk, str(legacy_id), room_legacy_ids 

70 ) 

71 

72 if item_ids and len(rooms) != len(item_ids): 

73 raise XMPPError( 

74 "item-not-found", 

75 f"Could not find items: {set(item_ids) - set(str(r.jid) for r in rooms)}", 

76 ) 

77 

78 reply = iq.reply() 

79 reply["pubsub"]["items"]["node"] = node 

80 for room in rooms: 

81 item = Item() 

82 item["id"] = room.jid 

83 item["conference"]["name"] = room.name 

84 reply["pubsub"]["items"].append(item) 

85 

86 reply.send() 

87 

88 async def __get_legacy_id(self, session: AnySession, node: str) -> object: 

89 try: 

90 legacy_id = await session.bookmarks.space_node_to_legacy_id(node) 

91 except Exception as e: 

92 raise XMPPError("item-not-found", str(e)) 

93 with self.xmpp.store.session() as orm: 

94 if not self.xmpp.store.spaces.exists(orm, session.user_pk, str(legacy_id)): 

95 raise XMPPError( 

96 "item-not-found", f"This is not a known space: '{legacy_id}'" 

97 ) 

98 return legacy_id 

99 

100 async def __set_subscribe(self, iq: Iq) -> None: 

101 node = iq["pubsub"]["subscribe"]["node"] 

102 

103 session = await self._get_session(iq, logged=True) 

104 await self.__get_legacy_id(session, node) 

105 

106 reply = iq.reply(clear=True) 

107 sub = reply["pubsub"]["subscription"] 

108 sub["node"] = node 

109 sub["jid"] = session.user_jid 

110 sub["subscription"] = "subscribed" 

111 reply.send() 

112 

113 async def __set_unsubscribe(self, iq: Iq) -> None: 

114 session = await self._get_session(iq, logged=True) 

115 

116 jid = iq["pubsub"]["unsubscribe"]["jid"] 

117 if jid != session.user_jid.bare: 

118 raise XMPPError("bad-request", f"Cannot unsubscribe JID {jid}") 

119 

120 node = iq["pubsub"]["unsubscribe"]["node"] 

121 legacy_id = await session.bookmarks.space_node_to_legacy_id(node) 

122 

123 await session.on_leave_space(legacy_id) 

124 reply = iq.reply(clear=True) 

125 

126 sub = reply["pubsub"]["subscription"] 

127 sub["node"] = node 

128 sub["jid"] = session.user_jid 

129 sub["subscription"] = "none" 

130 reply.send() 

131 

132 async def __get_affiliations(self, iq: Iq) -> None: 

133 session = await self._get_session(iq, logged=True) 

134 

135 reply = iq.reply() 

136 node = iq["pubsub"]["affiliations"]["node"] 

137 if node: 

138 legacy_id = await self.__get_legacy_id(session, node) 

139 with self.xmpp.store.session() as orm: 

140 if not self.xmpp.store.spaces.exists( 

141 orm, session.user_pk, str(legacy_id) 

142 ): 

143 raise XMPPError("item-not-found", f"Space '{legacy_id}' not found") 

144 affiliation = reply["pubsub"]["affiliations"]["affiliation"] 

145 affiliation["node"] = node 

146 affiliation["affiliation"] = "subscriber" 

147 else: 

148 with self.xmpp.store.session() as orm: 

149 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk)) 

150 for space in spaces: 

151 reply["pubsub"]["affiliations"].append( 

152 await self.__make_affiliation(session, space) 

153 ) 

154 

155 reply.send() 

156 

157 @staticmethod 

158 async def __make_affiliation(session: AnySession, space: Space) -> Affiliation: 

159 affiliation = Affiliation() 

160 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id) 

161 affiliation["node"] = node 

162 affiliation["affiliation"] = "subscriber" 

163 return affiliation 

164 

165 async def __owner_get_affiliations(self, iq: Iq) -> None: 

166 raise XMPPError( 

167 "forbidden", "Slidge does not implement managing space affiliations." 

168 ) 

169 

170 async def __owner_get_subscriptions(self, iq: Iq) -> None: 

171 node = iq["pubsub_owner"]["subscriptions"]["node"] 

172 if not node: 

173 raise XMPPError("bad-request", "No node was specified") 

174 

175 session = await self._get_session(iq, logged=True) 

176 await self.__get_legacy_id(session, node) 

177 

178 reply = iq.reply(clear=False) 

179 sub = reply["pubsub_owner"]["subscriptions"]["subscription"] 

180 sub["jid"] = session.user_jid 

181 sub["subscription"] = "subscribed" 

182 reply.send() 

183 

184 async def __get_subscriptions(self, iq: Iq) -> None: 

185 _node = iq["pubsub"]["subscriptions"]["node"] 

186 

187 session = await self._get_session(iq, logged=True) 

188 

189 reply = iq.reply(clear=False) 

190 subscriptions = reply["pubsub"]["subscriptions"] 

191 if node := iq["pubsub"]["subscriptions"]["node"]: 

192 await self.__get_legacy_id(session, node) 

193 subscription = subscriptions["subscription"] 

194 subscription["node"] = node 

195 subscription["jid"] = session.user_jid 

196 subscription["subscription"] = "subscribed" 

197 else: 

198 with self.xmpp.store.session() as orm: 

199 spaces = list(self.xmpp.store.spaces.get_all(orm, session.user_pk)) 

200 for space in spaces: 

201 subscriptions.append(await self.__make_subscription(session, space)) 

202 

203 reply.send() 

204 

205 @staticmethod 

206 async def __make_subscription(session: AnySession, space: Space) -> Subscription: 

207 subscription = Subscription() 

208 node = await session.bookmarks.space_legacy_id_to_node(space.legacy_id) 

209 subscription["node"] = node 

210 subscription["jid"] = session.user_jid 

211 subscription["subscription"] = "subscribed" 

212 return subscription