Coverage for slidge/core/pubsub.py: 85%

207 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import logging 

2from copy import copy 

3from pathlib import Path 

4from typing import TYPE_CHECKING, Optional, Union 

5 

6from slixmpp import ( 

7 JID, 

8 CoroutineCallback, 

9 Iq, 

10 Presence, 

11 StanzaPath, 

12 register_stanza_plugin, 

13) 

14from slixmpp.exceptions import XMPPError 

15from slixmpp.plugins.base import BasePlugin, register_plugin 

16from slixmpp.plugins.xep_0060.stanza import Event, EventItem, EventItems, Item 

17from slixmpp.plugins.xep_0084 import Data as AvatarData 

18from slixmpp.plugins.xep_0084 import MetaData as AvatarMetadata 

19from slixmpp.plugins.xep_0172 import UserNick 

20from slixmpp.plugins.xep_0292.stanza import VCard4 

21from slixmpp.types import JidStr, OptJidStr 

22 

23from ..db.avatar import CachedAvatar, avatar_cache 

24from ..db.store import ContactStore, SlidgeStore 

25from .mixins.lock import NamedLockMixin 

26 

27if TYPE_CHECKING: 

28 from slidge.core.gateway import BaseGateway 

29 

30 from ..contact.contact import LegacyContact 

31 

32VCARD4_NAMESPACE = "urn:xmpp:vcard4" 

33 

34 

35class PepItem: 

36 pass 

37 

38 

39class PepAvatar(PepItem): 

40 store: SlidgeStore 

41 

42 def __init__(self): 

43 self.metadata: Optional[AvatarMetadata] = None 

44 self.id: Optional[str] = None 

45 self._avatar_data_path: Optional[Path] = None 

46 

47 @property 

48 def data(self) -> Optional[AvatarData]: 

49 if self._avatar_data_path is None: 

50 return None 

51 data = AvatarData() 

52 data.set_value(self._avatar_data_path.read_bytes()) 

53 return data 

54 

55 def set_avatar_from_cache(self, cached_avatar: CachedAvatar): 

56 metadata = AvatarMetadata() 

57 self.id = cached_avatar.hash 

58 metadata.add_info( 

59 id=cached_avatar.hash, 

60 itype="image/png", 

61 ibytes=cached_avatar.path.stat().st_size, 

62 height=str(cached_avatar.height), 

63 width=str(cached_avatar.width), 

64 ) 

65 self.metadata = metadata 

66 self._avatar_data_path = cached_avatar.path 

67 

68 

69class PepNick(PepItem): 

70 contact_store: ContactStore 

71 

72 def __init__(self, nick: Optional[str] = None): 

73 nickname = UserNick() 

74 if nick is not None: 

75 nickname["nick"] = nick 

76 self.nick = nickname 

77 self.__nick_str = nick 

78 

79 

80class PubSubComponent(NamedLockMixin, BasePlugin): 

81 xmpp: "BaseGateway" 

82 

83 name = "pubsub" 

84 description = "Pubsub component" 

85 dependencies = { 

86 "xep_0030", 

87 "xep_0060", 

88 "xep_0115", 

89 "xep_0163", 

90 } 

91 default_config = {"component_name": None} 

92 component_name: str 

93 

94 def __init__(self, *a, **kw): 

95 super(PubSubComponent, self).__init__(*a, **kw) 

96 register_stanza_plugin(EventItem, UserNick) 

97 

98 def plugin_init(self): 

99 self.xmpp.register_handler( 

100 CoroutineCallback( 

101 "pubsub_get_avatar_data", 

102 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarData.namespace}"), 

103 self._get_avatar_data, # type:ignore 

104 ) 

105 ) 

106 self.xmpp.register_handler( 

107 CoroutineCallback( 

108 "pubsub_get_avatar_metadata", 

109 StanzaPath(f"iq@type=get/pubsub/items@node={AvatarMetadata.namespace}"), 

110 self._get_avatar_metadata, # type:ignore 

111 ) 

112 ) 

113 self.xmpp.register_handler( 

114 CoroutineCallback( 

115 "pubsub_get_vcard", 

116 StanzaPath(f"iq@type=get/pubsub/items@node={VCARD4_NAMESPACE}"), 

117 self._get_vcard, # type:ignore 

118 ) 

119 ) 

120 

121 disco = self.xmpp.plugin["xep_0030"] 

122 disco.add_identity("pubsub", "pep", self.component_name) 

123 disco.add_identity("account", "registered", self.component_name) 

124 disco.add_feature("http://jabber.org/protocol/pubsub#event") 

125 disco.add_feature("http://jabber.org/protocol/pubsub#retrieve-items") 

126 disco.add_feature("http://jabber.org/protocol/pubsub#persistent-items") 

127 

128 async def __get_features(self, presence: Presence) -> list[str]: 

129 from_ = presence.get_from() 

130 ver_string = presence["caps"]["ver"] 

131 if ver_string: 

132 info = await self.xmpp.plugin["xep_0115"].get_caps(from_) 

133 else: 

134 info = None 

135 if info is None: 

136 async with self.lock(from_): 

137 iq = await self.xmpp.plugin["xep_0030"].get_info(from_) 

138 info = iq["disco_info"] 

139 return info["features"] 

140 

141 async def on_presence_available( 

142 self, p: Presence, contact: Optional["LegacyContact"] 

143 ): 

144 if p.get_plugin("muc_join", check=True) is not None: 

145 log.debug("Ignoring MUC presence here") 

146 return 

147 

148 to = p.get_to() 

149 if to != self.xmpp.boundjid.bare: 

150 # we don't want to push anything for contacts that are not in the user's roster 

151 if contact is None or not contact.is_friend: 

152 return 

153 

154 from_ = p.get_from() 

155 features = await self.__get_features(p) 

156 

157 if AvatarMetadata.namespace + "+notify" in features: 

158 try: 

159 pep_avatar = await self._get_authorized_avatar(p, contact) 

160 except XMPPError: 

161 pass 

162 else: 

163 if pep_avatar.metadata is not None: 

164 await self.__broadcast( 

165 data=pep_avatar.metadata, 

166 from_=p.get_to().bare, 

167 to=from_, 

168 id=pep_avatar.metadata["info"]["id"], 

169 ) 

170 if UserNick.namespace + "+notify" in features: 

171 try: 

172 pep_nick = await self._get_authorized_nick(p, contact) 

173 except XMPPError: 

174 pass 

175 else: 

176 await self.__broadcast(data=pep_nick.nick, from_=p.get_to(), to=from_) 

177 

178 if contact is not None and VCARD4_NAMESPACE + "+notify" in features: 

179 await self.broadcast_vcard_event( 

180 p.get_to(), from_, await contact.get_vcard() 

181 ) 

182 

183 async def broadcast_vcard_event(self, from_: JID, to: JID, vcard: VCard4 | None): 

184 item = Item() 

185 item.namespace = VCARD4_NAMESPACE 

186 item["id"] = "current" 

187 # vcard: VCard4 = await self.xmpp["xep_0292_provider"].get_vcard(from_, to) 

188 # The vcard content should NOT be in this event according to the spec: 

189 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

190 # but movim expects it to be here, and I guess it does not hurt 

191 

192 log.debug("Broadcast vcard4 event: %s", vcard) 

193 await self.__broadcast( 

194 data=vcard, 

195 from_=JID(from_).bare, 

196 to=to, 

197 id="current", 

198 node=VCARD4_NAMESPACE, 

199 ) 

200 

201 async def __get_contact(self, stanza: Union[Iq, Presence]): 

202 session = self.xmpp.get_session_from_stanza(stanza) 

203 return await session.contacts.by_jid(stanza.get_to()) 

204 

205 async def _get_authorized_avatar( 

206 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None 

207 ) -> PepAvatar: 

208 if stanza.get_to() == self.xmpp.boundjid.bare: 

209 item = PepAvatar() 

210 item.set_avatar_from_cache(avatar_cache.get_by_pk(self.xmpp.avatar_pk)) 

211 return item 

212 

213 if contact is None: 

214 contact = await self.__get_contact(stanza) 

215 

216 item = PepAvatar() 

217 if contact.avatar_pk is not None: 

218 stored = avatar_cache.get_by_pk(contact.avatar_pk) 

219 assert stored is not None 

220 item.set_avatar_from_cache(stored) 

221 return item 

222 

223 async def _get_authorized_nick( 

224 self, stanza: Union[Iq, Presence], contact: Optional["LegacyContact"] = None 

225 ) -> PepNick: 

226 if stanza.get_to() == self.xmpp.boundjid.bare: 

227 return PepNick(self.xmpp.COMPONENT_NAME) 

228 

229 if contact is None: 

230 contact = await self.__get_contact(stanza) 

231 

232 if contact.name is not None: 

233 return PepNick(contact.name) 

234 else: 

235 return PepNick() 

236 

237 def __reply_with( 

238 self, iq: Iq, content: AvatarData | AvatarMetadata | None, item_id: str | None 

239 ) -> None: 

240 requested_items = iq["pubsub"]["items"] 

241 

242 if len(requested_items) == 0: 

243 self._reply_with_payload(iq, content, item_id) 

244 else: 

245 for item in requested_items: 

246 if item["id"] == item_id: 

247 self._reply_with_payload(iq, content, item_id) 

248 return 

249 else: 

250 raise XMPPError("item-not-found") 

251 

252 async def _get_avatar_data(self, iq: Iq): 

253 pep_avatar = await self._get_authorized_avatar(iq) 

254 self.__reply_with(iq, pep_avatar.data, pep_avatar.id) 

255 

256 async def _get_avatar_metadata(self, iq: Iq): 

257 pep_avatar = await self._get_authorized_avatar(iq) 

258 self.__reply_with(iq, pep_avatar.metadata, pep_avatar.id) 

259 

260 async def _get_vcard(self, iq: Iq): 

261 # this is not the proper way that clients should retrieve VCards, but 

262 # gajim does it this way. 

263 # https://xmpp.org/extensions/xep-0292.html#sect-idm45669698174224 

264 session = self.xmpp.get_session_from_stanza(iq) 

265 contact = await session.contacts.by_jid(iq.get_to()) 

266 vcard = await contact.get_vcard() 

267 if vcard is None: 

268 raise XMPPError("item-not-found") 

269 self._reply_with_payload(iq, vcard, "current", VCARD4_NAMESPACE) 

270 

271 @staticmethod 

272 def _reply_with_payload( 

273 iq: Iq, 

274 payload: Optional[Union[AvatarMetadata, AvatarData, VCard4]], 

275 id_: Optional[str], 

276 namespace: Optional[str] = None, 

277 ): 

278 result = iq.reply() 

279 item = Item() 

280 if payload: 

281 item.set_payload(payload.xml) 

282 item["id"] = id_ 

283 result["pubsub"]["items"]["node"] = ( 

284 namespace if namespace else payload.namespace 

285 ) 

286 result["pubsub"]["items"].append(item) 

287 result.send() 

288 

289 async def __broadcast(self, data, from_: JidStr, to: OptJidStr = None, **kwargs): 

290 from_ = JID(from_) 

291 if from_ != self.xmpp.boundjid.bare and to is not None: 

292 to = JID(to) 

293 session = self.xmpp.get_session_from_jid(to) 

294 if session is None: 

295 return 

296 await session.ready 

297 

298 item = EventItem() 

299 if data: 

300 item.set_payload(data.xml) 

301 for k, v in kwargs.items(): 

302 item[k] = v 

303 

304 items = EventItems() 

305 items.append(item) 

306 items["node"] = kwargs.get("node") or data.namespace 

307 

308 event = Event() 

309 event.append(items) 

310 

311 msg = self.xmpp.Message() 

312 msg.set_type("headline") 

313 msg.set_from(from_) 

314 msg.append(event) 

315 

316 if to is None: 

317 for u in self.xmpp.store.users.get_all(): 

318 new_msg = copy(msg) 

319 new_msg.set_to(u.jid.bare) 

320 new_msg.send() 

321 else: 

322 msg.set_to(to) 

323 msg.send() 

324 

325 async def broadcast_avatar( 

326 self, from_: JidStr, to: JidStr, cached_avatar: Optional[CachedAvatar] 

327 ) -> None: 

328 if cached_avatar is None: 

329 await self.__broadcast(AvatarMetadata(), from_, to) 

330 else: 

331 pep_avatar = PepAvatar() 

332 pep_avatar.set_avatar_from_cache(cached_avatar) 

333 assert pep_avatar.metadata is not None 

334 await self.__broadcast( 

335 pep_avatar.metadata, from_, to, id=pep_avatar.metadata["info"]["id"] 

336 ) 

337 

338 def broadcast_nick( 

339 self, 

340 user_jid: JID, 

341 jid: JidStr, 

342 nick: Optional[str] = None, 

343 ): 

344 jid = JID(jid) 

345 nickname = PepNick(nick) 

346 log.debug("New nickname: %s", nickname.nick) 

347 self.xmpp.loop.create_task(self.__broadcast(nickname.nick, jid, user_jid.bare)) 

348 

349 

350log = logging.getLogger(__name__) 

351register_plugin(PubSubComponent)