Coverage for slidge / core / pubsub.py: 84%

241 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1import logging 

2from copy import copy 

3from pathlib import Path 

4from typing import TYPE_CHECKING, ClassVar 

5 

6from slixmpp import ( 

7 JID, 

8 CoroutineCallback, 

9 ElementBase, 

10 Iq, 

11 Presence, 

12 StanzaPath, 

13 register_stanza_plugin, 

14) 

15from slixmpp.exceptions import IqError, IqTimeout, XMPPError 

16from slixmpp.plugins.base import BasePlugin, register_plugin 

17from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item 

18from slixmpp.plugins.xep_0084.stanza import Data as AvatarData 

19from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata 

20from slixmpp.plugins.xep_0172.stanza import UserNick 

21from slixmpp.plugins.xep_0292.stanza import VCard4 

22from slixmpp.plugins.xep_0402.stanza import Conference 

23from slixmpp.types import JidStr, OptJidStr 

24 

25from slidge.util.types import AnyContact, AnySession 

26 

27from ..db.avatar import CachedAvatar, avatar_cache 

28from ..db.models import GatewayUser, Space 

29from ..util.lock import NamedLockMixin 

30 

31if TYPE_CHECKING: 

32 from slidge.util.types import AnyGateway 

33 

34 

35VCARD4_NAMESPACE = "urn:xmpp:vcard4" 

36 

37 

38class PepAvatar: 

39 def __init__(self) -> None: 

40 self.metadata: AvatarMetadata | None = None 

41 self.id: str | None = None 

42 self._avatar_data_path: Path | None = None 

43 

44 @property 

45 def data(self) -> AvatarData | None: 

46 if self._avatar_data_path is None: 

47 return None 

48 data = AvatarData() 

49 data.set_value(self._avatar_data_path.read_bytes()) 

50 return data 

51 

52 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None: 

53 metadata = AvatarMetadata() 

54 self.id = cached_avatar.hash 

55 metadata.add_info( 

56 id=cached_avatar.hash, 

57 itype="image/png", 

58 ibytes=cached_avatar.path.stat().st_size, 

59 height=str(cached_avatar.height), 

60 width=str(cached_avatar.width), 

61 ) 

62 self.metadata = metadata 

63 self._avatar_data_path = cached_avatar.path 

64 

65 

66class PubSubComponent(NamedLockMixin, BasePlugin): 

67 xmpp: "AnyGateway" 

68 

69 name = "pubsub" 

70 description = "Pubsub component" 

71 dependencies: ClassVar[set[str]] = { 

72 "xep_0030", 

73 "xep_0060", 

74 "xep_0115", 

75 "xep_0163", 

76 } 

77 default_config: ClassVar[dict[str, str | None]] = {"component_name": None} 

78 component_name: str 

79 

80 def __init__(self, *a: object, **kw: object) -> None: 

81 super().__init__(*a, **kw) 

82 # slixmpp usually does this via self.xmpp["xep_0163"].register_pep(), 

83 # in the session_bind hook of plugins. However, for components, 

84 # session_bind is never called. 

85 register_stanza_plugin(EventItem, UserNick) 

86 register_stanza_plugin(EventItem, Conference) 

87 

88 def plugin_init(self) -> None: 

89 self.xmpp.register_handler( 

90 CoroutineCallback( 

91 "pubsub_get_avatar_data", 

92 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"), 

93 self._get_avatar_data, # type:ignore 

94 ) 

95 ) 

96 self.xmpp.register_handler( 

97 CoroutineCallback( 

98 "pubsub_get_avatar_metadata", 

99 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"), 

100 self._get_avatar_metadata, # type:ignore 

101 ) 

102 ) 

103 self.xmpp.register_handler( 

104 CoroutineCallback( 

105 "pubsub_get_vcard", 

106 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"), 

107 self._get_vcard, # type:ignore 

108 ) 

109 ) 

110 

111 disco = self.xmpp.plugin["xep_0030"] 

112 disco.add_identity("pubsub", "pep", self.component_name) 

113 disco.add_identity("account", "registered", self.component_name) 

114 disco.add_feature("http://jabber.org/protocol/pubsub#event") 

115 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items") 

116 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items") 

117 

118 async def __get_features(self, presence: Presence) -> list[str]: 

119 from_ = presence.get_from() 

120 ver_string = presence["caps"]["ver"] 

121 if ver_string: 

122 info = await self.xmpp.plugin["xep_0115"].get_caps(from_) 

123 else: 

124 info = None 

125 if info is None: 

126 async with self.lock(from_): 

127 try: 

128 iq = await self.xmpp.plugin["xep_0030"].get_info(from_) 

129 except (IqError, IqTimeout): 

130 log.debug("Could not get disco#info of %s, ignoring", from_) 

131 return [] 

132 info = iq["disco_info"] 

133 return info["features"] # type:ignore[no-any-return] 

134 

135 async def on_presence_available( 

136 self, p: Presence, contact: AnyContact | None 

137 ) -> None: 

138 if "muc_join" in p: 

139 log.debug("Ignoring MUC presence here") 

140 return 

141 

142 to = p.get_to() 

143 # we don't want to push anything for contacts that are not in the user's roster 

144 if to != self.xmpp.boundjid.bare and (contact is None or not contact.is_friend): 

145 return 

146 

147 from_ = p.get_from() 

148 features = await self.__get_features(p) 

149 

150 if AvatarMetadata.namespace + "+notify" in features: 

151 try: 

152 pep_avatar = await self._get_authorized_avatar(p, contact) 

153 except XMPPError: 

154 pass 

155 else: 

156 if pep_avatar.metadata is not None: 

157 await self.__broadcast( 

158 data=pep_avatar.metadata, 

159 from_=p.get_to().bare, 

160 to=from_, 

161 id=pep_avatar.metadata["info"]["id"], 

162 ) 

163 if UserNick.namespace + "+notify" in features: 

164 try: 

165 pep_nick = await self._get_authorized_nick(p, contact) 

166 except XMPPError: 

167 pass 

168 else: 

169 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_) 

170 

171 if contact is not None and VCARD4_NAMESPACE + "+notify" in features: 

172 vcard = await contact.get_vcard() 

173 if vcard is not None: 

174 await self.broadcast_vcard_event(p.get_to(), from_, vcard) 

175 

176 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4) -> None: 

177 item = Item() 

178 item.namespace = VCARD4_NAMESPACE 

179 item["id"] = "current" 

180 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to) 

181 # The vcard content should NOT be in this event according to the spec: 

182 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

183 # but movim expects it to be here, and I guess it does not hurt 

184 

185 log.debug("Broadcast vcard4 event: %s", vcard) 

186 await self.__broadcast( 

187 data=vcard, 

188 from_=JID(from_).bare, 

189 to=to, 

190 id="current", 

191 node=VCARD4_NAMESPACE, 

192 ) 

193 

194 async def __get_contact(self, stanza: Iq | Presence) -> AnyContact: 

195 session = self.xmpp.get_session_from_stanza(stanza) 

196 return await session.contacts.by_jid(stanza.get_to()) # type:ignore[no-any-return] 

197 

198 async def _get_authorized_avatar( 

199 self, stanza: Iq | Presence, contact: AnyContact | None = None 

200 ) -> PepAvatar: 

201 if stanza.get_to() == self.xmpp.boundjid.bare: 

202 item = PepAvatar() 

203 if self.xmpp.avatar is not None: 

204 item.set_avatar_from_cache(self.xmpp.avatar) 

205 return item 

206 

207 if contact is None: 

208 contact = await self.__get_contact(stanza) 

209 

210 item = PepAvatar() 

211 if contact.stored.avatar is not None: 

212 stored = avatar_cache.get(contact.stored.avatar) 

213 assert stored is not None 

214 item.set_avatar_from_cache(stored) 

215 return item 

216 

217 async def _get_authorized_nick( 

218 self, stanza: Iq | Presence, contact: AnyContact | None = None 

219 ) -> UserNick: 

220 if stanza.get_to() == self.xmpp.boundjid.bare: 

221 return get_user_nick(self.xmpp.COMPONENT_NAME) 

222 

223 if contact is None: 

224 contact = await self.__get_contact(stanza) 

225 

226 if contact.name is not None: 

227 return get_user_nick(contact.name) 

228 else: 

229 return UserNick() 

230 

231 def __reply_with( 

232 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None 

233 ) -> None: 

234 requested_items = iq["pubsub"]["items"] 

235 

236 if len(requested_items) == 0: 

237 self._reply_with_payload(iq, content, item_id) 

238 else: 

239 for item in requested_items: 

240 if item["id"] == item_id: 

241 self._reply_with_payload(iq, content, item_id) 

242 return 

243 else: 

244 raise XMPPError("item-not-found") 

245 

246 async def _get_avatar_data(self, iq: Iq) -> None: 

247 pep_avatar = await self._get_authorized_avatar(iq) 

248 self.__reply_with(iq, pep_avatar.data, pep_avatar.id) 

249 

250 async def _get_avatar_metadata(self, iq: Iq) -> None: 

251 pep_avatar = await self._get_authorized_avatar(iq) 

252 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id) 

253 

254 async def _get_vcard(self, iq: Iq) -> None: 

255 # this is not the proper way that clients should retrieve VCards, but 

256 # gajim does it this way. 

257 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

258 session = self.xmpp.get_session_from_stanza(iq) 

259 contact = await session.contacts.by_jid(iq.get_to()) 

260 vcard = await contact.get_vcard() 

261 if vcard is None: 

262 raise XMPPError("item-not-found") 

263 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE) 

264 

265 @staticmethod 

266 def _reply_with_payload( 

267 iq: Iq, 

268 payload: AvatarMetadata | AvatarData | VCard4 | None, 

269 id_: str | None, 

270 namespace: str | None = None, 

271 ) -> None: 

272 result = iq.reply() 

273 item = Item() 

274 if payload: 

275 item.set_payload(payload.xml) 

276 item["id"] = id_ 

277 result["pubsub"]["items"]["node"] = ( 

278 namespace if namespace else payload.namespace 

279 ) 

280 result["pubsub"]["items"].append(item) 

281 result.send() 

282 

283 async def __broadcast( 

284 self, 

285 data: ElementBase | None, 

286 from_: JidStr, 

287 to: OptJidStr = None, 

288 items: EventItems | None = None, 

289 **kwargs: object, 

290 ) -> None: 

291 from_ = JID(from_) 

292 if from_ != self.xmpp.boundjid.bare and to is not None: 

293 to = JID(to) 

294 session = self.xmpp.get_session_from_jid(to) 

295 if session is None: 

296 return 

297 await session.ready 

298 

299 item = EventItem() 

300 if data: 

301 item.set_payload(data.xml) 

302 for k, v in kwargs.items(): 

303 item[k] = v 

304 

305 if items is None: 

306 items = EventItems() 

307 items.append(item) 

308 assert data is not None 

309 items["node"] = kwargs.get("node") or data.namespace 

310 

311 event = Event() 

312 event.append(items) 

313 

314 msg = self.xmpp.Message() 

315 msg.set_type("headline") 

316 msg.set_from(from_) 

317 msg.append(event) 

318 

319 if to is None: 

320 with self.xmpp.store.session() as orm: 

321 for u in orm.query(GatewayUser).all(): 

322 new_msg = copy(msg) 

323 new_msg.set_to(u.jid.bare) 

324 new_msg.send() 

325 else: 

326 msg.set_to(to) 

327 msg.send() 

328 

329 async def broadcast_avatar( 

330 self, from_: JidStr, to: JidStr, cached_avatar: CachedAvatar | None 

331 ) -> None: 

332 if cached_avatar is None: 

333 await self.__broadcast(AvatarMetadata(), from_, to) 

334 else: 

335 pep_avatar = PepAvatar() 

336 pep_avatar.set_avatar_from_cache(cached_avatar) 

337 assert pep_avatar.metadata is not None 

338 await self.__broadcast( 

339 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"] 

340 ) 

341 

342 def broadcast_nick( 

343 self, 

344 user_jid: JID, 

345 jid: JidStr, 

346 nick: str | None = None, 

347 ) -> None: 

348 jid = JID(jid) 

349 nickname = get_user_nick(nick) 

350 log.debug("New nickname: %s", nickname) 

351 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare)) 

352 

353 def broadcast_space(self, session: AnySession, space: Space) -> None: 

354 items = EventItems() 

355 items["node"] = str(space.legacy_id) 

356 for room in space.rooms: 

357 item = EventItem() 

358 item["id"] = room.jid 

359 item.enable("conference") 

360 if room.name: 

361 item["conference"]["name"] = room.name 

362 items.append(item) 

363 session.create_task( 

364 self.__broadcast( 

365 None, 

366 self.xmpp.boundjid.bare, 

367 session.user_jid, 

368 items=items, 

369 ) 

370 ) 

371 

372 def broadcast_space_metadata( 

373 self, session: AnySession, space: Space, node: str 

374 ) -> None: 

375 msg = self.xmpp.make_message( 

376 mtype="headline", mto=session.user_jid, mfrom=self.xmpp.boundjid.bare 

377 ) 

378 msg["pubsub_event"]["configuration"]["node"] = node 

379 

380 form = msg["pubsub_event"]["configuration"]["form"] 

381 form["type"] = "result" 

382 

383 form.add_field( 

384 var="FORM_TYPE", 

385 ftype="hidden", 

386 value="http://jabber.org/protocol/pubsub#node_config", 

387 ) 

388 form.add_field(var="pubsub#title", value=space.name) 

389 

390 # MUSTs 

391 form.add_field(var="pubsub#type", value="urn:xmpp:spaces:0") 

392 form.add_field(var="pubsub#notify_retract", value="true") 

393 form.add_field(var="pubsub#persist_items", value="true") 

394 form.add_field(var="pubsub#purge_offline", value="false") 

395 # SHOULDs 

396 form.add_field(var="pubsub#notify_sub", value="true") 

397 form.add_field(var="pubsub#notify_config", value="true") 

398 form.add_field(var="pubsub#notify_delete", value="true") 

399 form.add_field(var="pubsub#publish_model", value="publishers") 

400 # MUST (private spaces) 

401 form.add_field(var="pubsub#access_model", value="authorize") 

402 

403 msg.send() 

404 

405 

406def get_user_nick(nick: str | None = None) -> UserNick: 

407 user_nick = UserNick() 

408 if nick is not None: 

409 user_nick["nick"] = nick 

410 return user_nick 

411 

412 

413log = logging.getLogger(__name__) 

414register_plugin(PubSubComponent)