Coverage for slidge / core / pubsub.py: 84%

242 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import logging 

2from copy import copy 

3from pathlib import Path 

4from typing import ClassVar 

5 

6from slixmpp import ( 

7 JID, 

8 CoroutineCallback, 

9 ElementBase, 

10 Iq, 

11 Presence, 

12 StanzaPath, 

13 register_stanza_plugin, 

14) 

15from slixmpp.exceptions import IqError, IqTimeout, XMPPError 

16from slixmpp.plugins.base import BasePlugin, register_plugin 

17from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item 

18from slixmpp.plugins.xep_0084.stanza import Data as AvatarData 

19from slixmpp.plugins.xep_0084.stanza import MetaData as AvatarMetadata 

20from slixmpp.plugins.xep_0172.stanza import UserNick 

21from slixmpp.plugins.xep_0292.stanza import VCard4 

22from slixmpp.plugins.xep_0402.stanza import Conference 

23from slixmpp.types import JidStr, OptJidStr 

24 

25from ..contact import LegacyContact 

26from ..db.avatar import CachedAvatar, avatar_cache 

27from ..db.models import GatewayUser, Space 

28from ..util.lock import NamedLockMixin 

29from ..util.types import AnyGateway, AnySession 

30 

31VCARD4_NAMESPACE = "urn:xmpp:vcard4" 

32 

33 

34class PepAvatar: 

35 def __init__(self) -> None: 

36 self.metadata: AvatarMetadata | None = None 

37 self.id: str | None = None 

38 self._avatar_data_path: Path | None = None 

39 

40 @property 

41 def data(self) -> AvatarData | None: 

42 if self._avatar_data_path is None: 

43 return None 

44 data = AvatarData() 

45 data.set_value(self._avatar_data_path.read_bytes()) 

46 return data 

47 

48 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None: 

49 metadata = AvatarMetadata() 

50 self.id = cached_avatar.hash 

51 metadata.add_info( 

52 id=cached_avatar.hash, 

53 itype="image/png", 

54 ibytes=cached_avatar.path.stat().st_size, 

55 height=str(cached_avatar.height), 

56 width=str(cached_avatar.width), 

57 ) 

58 self.metadata = metadata 

59 self._avatar_data_path = cached_avatar.path 

60 

61 

62class PubSubComponent(NamedLockMixin, BasePlugin): 

63 xmpp: "AnyGateway" 

64 

65 name = "pubsub" 

66 description = "Pubsub component" 

67 dependencies: ClassVar[set[str]] = { 

68 "xep_0030", 

69 "xep_0060", 

70 "xep_0115", 

71 "xep_0163", 

72 } 

73 default_config: ClassVar[dict[str, str | None]] = {"component_name": None} 

74 component_name: str 

75 

76 def __init__(self, *a: object, **kw: object) -> None: 

77 super().__init__(*a, **kw) 

78 # slixmpp usually does this via self.xmpp["xep_0163"].register_pep(), 

79 # in the session_bind hook of plugins. However, for components, 

80 # session_bind is never called. 

81 register_stanza_plugin(EventItem, UserNick) 

82 register_stanza_plugin(EventItem, Conference) 

83 

84 def plugin_init(self) -> None: 

85 self.xmpp.register_handler( 

86 CoroutineCallback( 

87 "pubsub_get_avatar_data", 

88 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"), 

89 self._get_avatar_data, # type:ignore 

90 ) 

91 ) 

92 self.xmpp.register_handler( 

93 CoroutineCallback( 

94 "pubsub_get_avatar_metadata", 

95 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"), 

96 self._get_avatar_metadata, # type:ignore 

97 ) 

98 ) 

99 self.xmpp.register_handler( 

100 CoroutineCallback( 

101 "pubsub_get_vcard", 

102 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"), 

103 self._get_vcard, # type:ignore 

104 ) 

105 ) 

106 

107 disco = self.xmpp.plugin["xep_0030"] 

108 disco.add_identity("pubsub", "pep", self.component_name) 

109 disco.add_identity("account", "registered", self.component_name) 

110 disco.add_feature("http://jabber.org/protocol/pubsub#event") 

111 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items") 

112 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items") 

113 

114 async def __get_features(self, presence: Presence) -> list[str]: 

115 from_ = presence.get_from() 

116 ver_string = presence["caps"]["ver"] 

117 if ver_string: 

118 info = await self.xmpp.plugin["xep_0115"].get_caps(from_) 

119 else: 

120 info = None 

121 if info is None: 

122 async with self.lock(from_): 

123 try: 

124 iq = await self.xmpp.plugin["xep_0030"].get_info(from_) 

125 except (IqError, IqTimeout): 

126 log.debug("Could not get disco#info of %s, ignoring", from_) 

127 return [] 

128 info = iq["disco_info"] 

129 return info["features"] # type:ignore[no-any-return] 

130 

131 async def on_presence_available( 

132 self, p: Presence, contact: LegacyContact | None 

133 ) -> None: 

134 if "muc_join" in p: 

135 log.debug("Ignoring MUC presence here") 

136 return 

137 

138 to = p.get_to() 

139 # we don't want to push anything for contacts that are not in the user's roster 

140 if to != self.xmpp.boundjid.bare and (contact is None or not contact.is_friend): 

141 return 

142 

143 from_ = p.get_from() 

144 features = await self.__get_features(p) 

145 

146 if AvatarMetadata.namespace + "+notify" in features: 

147 try: 

148 pep_avatar = await self._get_authorized_avatar(p, contact) 

149 except XMPPError: 

150 pass 

151 else: 

152 if pep_avatar.metadata is not None: 

153 await self.__broadcast( 

154 data=pep_avatar.metadata, 

155 from_=p.get_to().bare, 

156 to=from_, 

157 id=pep_avatar.metadata["info"]["id"], 

158 ) 

159 if UserNick.namespace + "+notify" in features: 

160 try: 

161 pep_nick = await self._get_authorized_nick(p, contact) 

162 except XMPPError: 

163 pass 

164 else: 

165 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_) 

166 

167 if contact is not None and VCARD4_NAMESPACE + "+notify" in features: 

168 vcard = await contact.get_vcard() 

169 if vcard is not None: 

170 await self.broadcast_vcard_event(p.get_to(), from_, vcard) 

171 

172 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4) -> None: 

173 item = Item() 

174 item.namespace = VCARD4_NAMESPACE 

175 item["id"] = "current" 

176 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to) 

177 # The vcard content should NOT be in this event according to the spec: 

178 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

179 # but movim expects it to be here, and I guess it does not hurt 

180 

181 log.debug("Broadcast vcard4 event: %s", vcard) 

182 await self.__broadcast( 

183 data=vcard, 

184 from_=JID(from_).bare, 

185 to=to, 

186 id="current", 

187 node=VCARD4_NAMESPACE, 

188 ) 

189 

190 async def __get_contact(self, stanza: Iq | Presence) -> LegacyContact: 

191 session = self.xmpp.get_session_from_stanza(stanza) 

192 return await session.contacts.by_jid(stanza.get_to()) # type:ignore[no-any-return] 

193 

194 async def _get_authorized_avatar( 

195 self, stanza: Iq | Presence, contact: LegacyContact | None = None 

196 ) -> PepAvatar: 

197 if stanza.get_to() == self.xmpp.boundjid.bare: 

198 item = PepAvatar() 

199 if self.xmpp.avatar is not None: 

200 item.set_avatar_from_cache(self.xmpp.avatar) 

201 return item 

202 

203 if contact is None: 

204 contact = await self.__get_contact(stanza) 

205 

206 item = PepAvatar() 

207 if contact.stored.avatar is not None: 

208 stored = avatar_cache.get(contact.stored.avatar) 

209 assert stored is not None 

210 item.set_avatar_from_cache(stored) 

211 return item 

212 

213 async def _get_authorized_nick( 

214 self, stanza: Iq | Presence, contact: LegacyContact | None = None 

215 ) -> UserNick: 

216 if stanza.get_to() == self.xmpp.boundjid.bare: 

217 return get_user_nick(self.xmpp.COMPONENT_NAME) 

218 

219 if contact is None: 

220 contact = await self.__get_contact(stanza) 

221 

222 if contact.name is not None: 

223 return get_user_nick(contact.name) 

224 else: 

225 return UserNick() 

226 

227 def __reply_with( 

228 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None 

229 ) -> None: 

230 requested_items = iq["pubsub"]["items"] 

231 

232 if len(requested_items) == 0: 

233 self._reply_with_payload(iq, content, item_id) 

234 else: 

235 for item in requested_items: 

236 if item["id"] == item_id: 

237 self._reply_with_payload(iq, content, item_id) 

238 return 

239 else: 

240 raise XMPPError("item-not-found") 

241 

242 async def _get_avatar_data(self, iq: Iq) -> None: 

243 pep_avatar = await self._get_authorized_avatar(iq) 

244 self.__reply_with(iq, pep_avatar.data, pep_avatar.id) 

245 

246 async def _get_avatar_metadata(self, iq: Iq) -> None: 

247 pep_avatar = await self._get_authorized_avatar(iq) 

248 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id) 

249 

250 async def _get_vcard(self, iq: Iq) -> None: 

251 # this is not the proper way that clients should retrieve VCards, but 

252 # gajim does it this way. 

253 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

254 session = self.xmpp.get_session_from_stanza(iq) 

255 contact = await session.contacts.by_jid(iq.get_to()) 

256 vcard = await contact.get_vcard() 

257 if vcard is None: 

258 raise XMPPError("item-not-found") 

259 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE) 

260 

261 @staticmethod 

262 def _reply_with_payload( 

263 iq: Iq, 

264 payload: AvatarMetadata | AvatarData | VCard4 | None, 

265 id_: str | None, 

266 namespace: str | None = None, 

267 ) -> None: 

268 result = iq.reply() 

269 item = Item() 

270 if payload: 

271 item.set_payload(payload.xml) 

272 item["id"] = id_ 

273 result["pubsub"]["items"]["node"] = ( 

274 namespace if namespace else payload.namespace 

275 ) 

276 result["pubsub"]["items"].append(item) 

277 result.send() 

278 

279 async def __broadcast( 

280 self, 

281 data: ElementBase | None, 

282 from_: JidStr, 

283 to: OptJidStr = None, 

284 items: EventItems | None = None, 

285 **kwargs: object, 

286 ) -> None: 

287 from_ = JID(from_) 

288 if from_ != self.xmpp.boundjid.bare and to is not None: 

289 to = JID(to) 

290 session = self.xmpp.get_session_from_jid(to) 

291 if session is None: 

292 return 

293 await session.ready 

294 

295 item = EventItem() 

296 if data: 

297 item.set_payload(data.xml) 

298 for k, v in kwargs.items(): 

299 item[k] = v 

300 

301 if items is None: 

302 items = EventItems() 

303 items.append(item) 

304 assert data is not None 

305 items["node"] = kwargs.get("node") or data.namespace 

306 

307 event = Event() 

308 event.append(items) 

309 

310 msg = self.xmpp.Message() 

311 msg.set_type("headline") 

312 msg.set_from(from_) 

313 msg.append(event) 

314 

315 if to is None: 

316 with self.xmpp.store.session() as orm: 

317 for u in orm.query(GatewayUser).all(): 

318 new_msg = copy(msg) 

319 new_msg.set_to(u.jid.bare) 

320 new_msg.send() 

321 else: 

322 msg.set_to(to) 

323 msg.send() 

324 

325 async def broadcast_avatar( 

326 self, from_: JidStr, to: JidStr, cached_avatar: CachedAvatar | None 

327 ) -> None: 

328 if cached_avatar is None: 

329 await self.__broadcast(AvatarMetadata(), from_, to) 

330 else: 

331 pep_avatar = PepAvatar() 

332 pep_avatar.set_avatar_from_cache(cached_avatar) 

333 assert pep_avatar.metadata is not None 

334 await self.__broadcast( 

335 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"] 

336 ) 

337 

338 def broadcast_nick( 

339 self, 

340 user_jid: JID, 

341 jid: JidStr, 

342 nick: str | None = None, 

343 ) -> None: 

344 jid = JID(jid) 

345 nickname = get_user_nick(nick) 

346 log.debug("New nickname: %s", nickname) 

347 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare)) 

348 

349 def broadcast_space(self, session: AnySession, space: Space) -> None: 

350 items = EventItems() 

351 items["node"] = str(space.legacy_id) 

352 for room in space.rooms: 

353 item = EventItem() 

354 item["id"] = room.jid 

355 item.enable("conference") 

356 if room.name: 

357 item["conference"]["name"] = room.name 

358 items.append(item) 

359 session.create_task( 

360 self.__broadcast( 

361 None, 

362 self.xmpp.boundjid.bare, 

363 session.user_jid, 

364 items=items, 

365 ) 

366 ) 

367 

368 def broadcast_space_metadata( 

369 self, session: AnySession, space: Space, node: str 

370 ) -> None: 

371 msg = self.xmpp.make_message( 

372 mtype="headline", mto=session.user_jid, mfrom=self.xmpp.boundjid.bare 

373 ) 

374 msg["pubsub_event"]["configuration"]["node"] = node 

375 

376 form = msg["pubsub_event"]["configuration"]["form"] 

377 form["type"] = "result" 

378 

379 form.add_field( 

380 var="FORM_TYPE", 

381 ftype="hidden", 

382 value="http://jabber.org/protocol/pubsub#node_config", 

383 ) 

384 form.add_field(var="pubsub#title", value=space.name) 

385 

386 # MUSTs 

387 form.add_field(var="pubsub#type", value="urn:xmpp:spaces:0") 

388 form.add_field(var="pubsub#notify_retract", value="true") 

389 form.add_field(var="pubsub#persist_items", value="true") 

390 form.add_field(var="pubsub#purge_offline", value="false") 

391 # SHOULDs 

392 form.add_field(var="pubsub#notify_sub", value="true") 

393 form.add_field(var="pubsub#notify_config", value="true") 

394 form.add_field(var="pubsub#notify_delete", value="true") 

395 form.add_field(var="pubsub#publish_model", value="publishers") 

396 # MUST (private spaces) 

397 form.add_field(var="pubsub#access_model", value="authorize") 

398 

399 msg.send() 

400 

401 

402def get_user_nick(nick: str | None = None) -> UserNick: 

403 user_nick = UserNick() 

404 if nick is not None: 

405 user_nick["nick"] = nick 

406 return user_nick 

407 

408 

409log = logging.getLogger(__name__) 

410register_plugin(PubSubComponent)