Coverage for slidge/core/pubsub.py: 83%

207 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import logging 

2from copy import copy 

3from pathlib import Path 

4from typing import TYPE_CHECKING, Optional, Union 

5 

6from slixmpp import ( 

7 JID, 

8 CoroutineCallback, 

9 Iq, 

10 Presence, 

11 StanzaPath, 

12 register_stanza_plugin, 

13) 

14from slixmpp.exceptions import IqError, IqTimeout, XMPPError 

15from slixmpp.plugins.base import BasePlugin, register_plugin 

16from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item 

17from slixmpp.plugins.xep_0084 import Data as AvatarData 

18from slixmpp.plugins.xep_0084 import MetaData as AvatarMetadata 

19from slixmpp.plugins.xep_0172 import UserNick 

20from slixmpp.plugins.xep_0292.stanza import VCard4 

21from slixmpp.types import JidStr, OptJidStr 

22 

23from ..db.avatar import CachedAvatar, avatar_cache 

24from ..db.models import GatewayUser 

25from ..util.lock import NamedLockMixin 

26 

27if TYPE_CHECKING: 

28 from slidge.core.gateway import BaseGateway 

29 

30 from ..contact.contact import LegacyContact 

31 

32VCARD4_NAMESPACE = "urn:xmpp:vcard4" 

33 

34 

35class PepAvatar: 

36 def __init__(self) -> None: 

37 self.metadata: Optional[AvatarMetadata] = None 

38 self.id: Optional[str] = None 

39 self._avatar_data_path: Optional[Path] = None 

40 

41 @property 

42 def data(self) -> Optional[AvatarData]: 

43 if self._avatar_data_path is None: 

44 return None 

45 data = AvatarData() 

46 data.set_value(self._avatar_data_path.read_bytes()) 

47 return data 

48 

49 def set_avatar_from_cache(self, cached_avatar: CachedAvatar) -> None: 

50 metadata = AvatarMetadata() 

51 self.id = cached_avatar.hash 

52 metadata.add_info( 

53 id=cached_avatar.hash, 

54 itype="image/png", 

55 ibytes=cached_avatar.path.stat().st_size, 

56 height=str(cached_avatar.height), 

57 width=str(cached_avatar.width), 

58 ) 

59 self.metadata = metadata 

60 self._avatar_data_path = cached_avatar.path 

61 

62 

63class PubSubComponent(NamedLockMixin, BasePlugin): 

64 xmpp: "BaseGateway" 

65 

66 name = "pubsub" 

67 description = "Pubsub component" 

68 dependencies = { 

69 "xep_0030", 

70 "xep_0060", 

71 "xep_0115", 

72 "xep_0163", 

73 } 

74 default_config = {"component_name": None} 

75 component_name: str 

76 

77 def __init__(self, *a, **kw) -> None: 

78 super(PubSubComponent, self).__init__(*a, **kw) 

79 register_stanza_plugin(EventItem, UserNick) 

80 

81 def plugin_init(self) -> None: 

82 self.xmpp.register_handler( 

83 CoroutineCallback( 

84 "pubsub_get_avatar_data", 

85 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"), 

86 self._get_avatar_data, # type:ignore 

87 ) 

88 ) 

89 self.xmpp.register_handler( 

90 CoroutineCallback( 

91 "pubsub_get_avatar_metadata", 

92 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"), 

93 self._get_avatar_metadata, # type:ignore 

94 ) 

95 ) 

96 self.xmpp.register_handler( 

97 CoroutineCallback( 

98 "pubsub_get_vcard", 

99 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"), 

100 self._get_vcard, # type:ignore 

101 ) 

102 ) 

103 

104 disco = self.xmpp.plugin["xep_0030"] 

105 disco.add_identity("pubsub", "pep", self.component_name) 

106 disco.add_identity("account", "registered", self.component_name) 

107 disco.add_feature("http://jabber.org/protocol/pubsub#event") 

108 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items") 

109 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items") 

110 

111 async def __get_features(self, presence: Presence) -> list[str]: 

112 from_ = presence.get_from() 

113 ver_string = presence["caps"]["ver"] 

114 if ver_string: 

115 info = await self.xmpp.plugin["xep_0115"].get_caps(from_) 

116 else: 

117 info = None 

118 if info is None: 

119 async with self.lock(from_): 

120 try: 

121 iq = await self.xmpp.plugin["xep_0030"].get_info(from_) 

122 except (IqError, IqTimeout): 

123 log.debug("Could get disco#info of %s, ignoring", from_) 

124 return [] 

125 info = iq["disco_info"] 

126 return info["features"] 

127 

128 async def on_presence_available( 

129 self, p: Presence, contact: Optional["LegacyContact"] 

130 ) -> None: 

131 if p.get_plugin("muc_join", check=True) is not None: 

132 log.debug("Ignoring MUC presence here") 

133 return 

134 

135 to = p.get_to() 

136 if to != self.xmpp.boundjid.bare: 

137 # we don't want to push anything for contacts that are not in the user's roster 

138 if contact is None or not contact.is_friend: 

139 return 

140 

141 from_ = p.get_from() 

142 features = await self.__get_features(p) 

143 

144 if AvatarMetadata.namespace + "+notify" in features: 

145 try: 

146 pep_avatar = await self._get_authorized_avatar(p, contact) 

147 except XMPPError: 

148 pass 

149 else: 

150 if pep_avatar.metadata is not None: 

151 await self.__broadcast( 

152 data=pep_avatar.metadata, 

153 from_=p.get_to().bare, 

154 to=from_, 

155 id=pep_avatar.metadata["info"]["id"], 

156 ) 

157 if UserNick.namespace + "+notify" in features: 

158 try: 

159 pep_nick = await self._get_authorized_nick(p, contact) 

160 except XMPPError: 

161 pass 

162 else: 

163 await self.__broadcast(data=pep_nick, from_=p.get_to(), to=from_) 

164 

165 if contact is not None and VCARD4_NAMESPACE + "+notify" in features: 

166 await self.broadcast_vcard_event( 

167 p.get_to(), from_, await contact.get_vcard() 

168 ) 

169 

170 async def broadcast_vcard_event( 

171 self, from_: JID, to: JID, vcard: VCard4 | None 

172 ) -> None: 

173 item = Item() 

174 item.namespace = VCARD4_NAMESPACE 

175 item["id"] = "current" 

176 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to) 

177 # The vcard content should NOT be in this event according to the spec: 

178 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

179 # but movim expects it to be here, and I guess it does not hurt 

180 

181 log.debug("Broadcast vcard4 event: %s", vcard) 

182 await self.__broadcast( 

183 data=vcard, 

184 from_=JID(from_).bare, 

185 to=to, 

186 id="current", 

187 node=VCARD4_NAMESPACE, 

188 ) 

189 

190 async def __get_contact(self, stanza: Union[Iq, Presence]): 

191 session = self.xmpp.get_session_from_stanza(stanza) 

192 return await session.contacts.by_jid(stanza.get_to()) 

193 

194 async def _get_authorized_avatar( 

195 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None 

196 ) -> PepAvatar: 

197 if stanza.get_to() == self.xmpp.boundjid.bare: 

198 item = PepAvatar() 

199 if self.xmpp.avatar is not None: 

200 item.set_avatar_from_cache(self.xmpp.avatar) 

201 return item 

202 

203 if contact is None: 

204 contact = await self.__get_contact(stanza) 

205 

206 item = PepAvatar() 

207 if contact.stored.avatar is not None: 

208 stored = avatar_cache.get(contact.stored.avatar) 

209 assert stored is not None 

210 item.set_avatar_from_cache(stored) 

211 return item 

212 

213 async def _get_authorized_nick( 

214 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None 

215 ) -> UserNick: 

216 if stanza.get_to() == self.xmpp.boundjid.bare: 

217 return get_user_nick(self.xmpp.COMPONENT_NAME) 

218 

219 if contact is None: 

220 contact = await self.__get_contact(stanza) 

221 

222 if contact.name is not None: 

223 return get_user_nick(contact.name) 

224 else: 

225 return UserNick() 

226 

227 def __reply_with( 

228 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None 

229 ) -> None: 

230 requested_items = iq["pubsub"]["items"] 

231 

232 if len(requested_items) == 0: 

233 self._reply_with_payload(iq, content, item_id) 

234 else: 

235 for item in requested_items: 

236 if item["id"] == item_id: 

237 self._reply_with_payload(iq, content, item_id) 

238 return 

239 else: 

240 raise XMPPError("item-not-found") 

241 

242 async def _get_avatar_data(self, iq: Iq) -> None: 

243 pep_avatar = await self._get_authorized_avatar(iq) 

244 self.__reply_with(iq, pep_avatar.data, pep_avatar.id) 

245 

246 async def _get_avatar_metadata(self, iq: Iq) -> None: 

247 pep_avatar = await self._get_authorized_avatar(iq) 

248 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id) 

249 

250 async def _get_vcard(self, iq: Iq): 

251 # this is not the proper way that clients should retrieve VCards, but 

252 # gajim does it this way. 

253 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

254 session = self.xmpp.get_session_from_stanza(iq) 

255 contact = await session.contacts.by_jid(iq.get_to()) 

256 vcard = await contact.get_vcard() 

257 if vcard is None: 

258 raise XMPPError("item-not-found") 

259 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE) 

260 

261 @staticmethod 

262 def _reply_with_payload( 

263 iq: Iq, 

264 payload: Optional[Union[AvatarMetadata, AvatarData, VCard4]], 

265 id_: Optional[str], 

266 namespace: Optional[str] = None, 

267 ) -> None: 

268 result = iq.reply() 

269 item = Item() 

270 if payload: 

271 item.set_payload(payload.xml) 

272 item["id"] = id_ 

273 result["pubsub"]["items"]["node"] = ( 

274 namespace if namespace else payload.namespace 

275 ) 

276 result["pubsub"]["items"].append(item) 

277 result.send() 

278 

279 async def __broadcast( 

280 self, data, from_: JidStr, to: OptJidStr = None, **kwargs 

281 ) -> None: 

282 from_ = JID(from_) 

283 if from_ != self.xmpp.boundjid.bare and to is not None: 

284 to = JID(to) 

285 session = self.xmpp.get_session_from_jid(to) 

286 if session is None: 

287 return 

288 await session.ready 

289 

290 item = EventItem() 

291 if data: 

292 item.set_payload(data.xml) 

293 for k, v in kwargs.items(): 

294 item[k] = v 

295 

296 items = EventItems() 

297 items.append(item) 

298 items["node"] = kwargs.get("node") or data.namespace 

299 

300 event = Event() 

301 event.append(items) 

302 

303 msg = self.xmpp.Message() 

304 msg.set_type("headline") 

305 msg.set_from(from_) 

306 msg.append(event) 

307 

308 if to is None: 

309 with self.xmpp.store.session() as orm: 

310 for u in orm.query(GatewayUser).all(): 

311 new_msg = copy(msg) 

312 new_msg.set_to(u.jid.bare) 

313 new_msg.send() 

314 else: 

315 msg.set_to(to) 

316 msg.send() 

317 

318 async def broadcast_avatar( 

319 self, from_: JidStr, to: JidStr, cached_avatar: Optional[CachedAvatar] 

320 ) -> None: 

321 if cached_avatar is None: 

322 await self.__broadcast(AvatarMetadata(), from_, to) 

323 else: 

324 pep_avatar = PepAvatar() 

325 pep_avatar.set_avatar_from_cache(cached_avatar) 

326 assert pep_avatar.metadata is not None 

327 await self.__broadcast( 

328 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"] 

329 ) 

330 

331 def broadcast_nick( 

332 self, 

333 user_jid: JID, 

334 jid: JidStr, 

335 nick: Optional[str] = None, 

336 ) -> None: 

337 jid = JID(jid) 

338 nickname = get_user_nick(nick) 

339 log.debug("New nickname: %s", nickname) 

340 self.xmpp.loop.create_task(self.__broadcast(nickname, jid, user_jid.bare)) 

341 

342 

343def get_user_nick(nick: Optional[str] = None) -> UserNick: 

344 user_nick = UserNick() 

345 if nick is not None: 

346 user_nick["nick"] = nick 

347 return user_nick 

348 

349 

350log = logging.getLogger(__name__) 

351register_plugin(PubSubComponent)