Coverage for slidge / core / dispatcher / message / message.py: 85%

270 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import base64 

2import contextlib 

3import hashlib 

4import logging 

5from copy import copy 

6from dataclasses import dataclass 

7from typing import TYPE_CHECKING 

8from xml.etree import ElementTree 

9 

10from slixmpp import JID, Message 

11from slixmpp.exceptions import XMPPError 

12from slixmpp.plugins.xep_0511.stanza import LinkMetadata 

13 

14from ....contact.contact import LegacyContact 

15from ....group.room import LegacyMUC 

16from ....util.types import ( 

17 AnyRecipient, 

18 LinkPreview, 

19 RecipientType, 

20 Reply, 

21 XMPPAttachment, 

22 XMPPMessage, 

23) 

24from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

25from ... import config 

26from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

27 

28if TYPE_CHECKING: 

29 from slidge.util.types import AnyGateway 

30 

31 

32@dataclass 

33class _IncomingAttachment: 

34 attachment: XMPPAttachment 

35 is_sticker: bool 

36 cid: str | None = None 

37 

38 

39class MessageContentMixin(DispatcherMixin): 

40 __slots__: list[str] = [] 

41 

42 def __init__(self, xmpp: "AnyGateway") -> None: 

43 super().__init__(xmpp) 

44 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

45 xmpp.add_event_handler("message_retract", self.on_message_retract) 

46 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

47 xmpp.add_event_handler("reactions", self.on_reactions) 

48 

49 async def on_groupchat_message(self, msg: Message) -> None: 

50 await self.on_legacy_message(msg) 

51 

52 @exceptions_to_xmpp_errors 

53 async def on_legacy_message(self, msg: Message) -> None: 

54 """ 

55 Meant to be called from :class:`BaseGateway` only. 

56 

57 :param msg: 

58 :return: 

59 """ 

60 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

61 # present. this is a problem for MUC echoed messages 

62 if "apply_to" in msg: 

63 # ignore message retraction (handled by a specific method) 

64 # this is an old version of the protocol that depends on the 

65 # message fastening deprecated xep. 

66 return 

67 if "reactions" in msg: 

68 # ignore message reaction fallback. 

69 # the reaction itself is handled by self.react_from_msg(). 

70 return 

71 if "retract" in msg: 

72 # ignore message retraction fallback. 

73 # the retraction itself is handled by self.on_retract 

74 return 

75 recipient, thread = await self._get_recipient_and_thread(msg) 

76 replace = await self.__get_replace(msg, recipient) 

77 if msg.xml.find(".//{*}encrypted") is not None: 

78 raise XMPPError( 

79 "bad-request", "You cannot send encrypted messages through this gateway" 

80 ) 

81 body, reply = await self.__get_reply(msg, recipient) 

82 cid = self.__get_xhtml_sticker_cid(msg) 

83 

84 if cid: 

85 legacy_msg_id = await self.__dispatch_bob( 

86 msg.get_from(), 

87 cid, 

88 recipient, 

89 reply=reply, 

90 thread=thread, 

91 ) 

92 else: 

93 attachments = self.__get_attachments(msg) 

94 if len(attachments) == 1 and attachments[0].is_sticker: 

95 legacy_msg_id = await self.__dispatch_nonbob_sticker( 

96 attachments[0], 

97 recipient, 

98 msg["body"], 

99 reply=reply, 

100 thread=thread, 

101 ) 

102 else: 

103 legacy_msg_id = await self.__dispatch_msg( 

104 replace=replace, 

105 body=body, 

106 attachments=tuple(a.attachment for a in attachments), 

107 recipient=recipient, 

108 thread=thread, 

109 reply=reply, 

110 msg=msg, 

111 ) 

112 

113 if isinstance(recipient, LegacyMUC): 

114 stanza_id = await recipient.echo(msg, legacy_msg_id) 

115 else: 

116 stanza_id = None 

117 self.__ack(msg) 

118 

119 if not legacy_msg_id: 

120 return 

121 

122 with self.xmpp.store.session() as orm: 

123 if recipient.is_group: 

124 self.xmpp.store.id_map.set_origin( 

125 orm, recipient.stored.id, legacy_msg_id, msg.get_id() 

126 ) 

127 assert stanza_id is not None 

128 self.xmpp.store.id_map.set_msg( 

129 orm, 

130 recipient.stored.id, 

131 legacy_msg_id, 

132 [stanza_id], 

133 True, 

134 ) 

135 else: 

136 self.xmpp.store.id_map.set_msg( 

137 orm, 

138 recipient.stored.id, 

139 legacy_msg_id, 

140 [msg.get_id()], 

141 False, 

142 ) 

143 if recipient.session.MESSAGE_IDS_ARE_THREAD_IDS and thread: 

144 self.xmpp.store.id_map.set_thread( 

145 orm, recipient.stored.id, thread, legacy_msg_id, recipient.is_group 

146 ) 

147 orm.commit() 

148 

149 def __get_xhtml_sticker_cid(self, msg: Message) -> str | None: 

150 if "html" not in msg: 

151 return None 

152 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

153 p = body.findall("p") 

154 if p is None: 

155 return None 

156 if len(p) != 1: 

157 return None 

158 if p[0].text is not None and p[0].text.strip(): 

159 return None 

160 

161 images = p[0].findall("img") 

162 if len(images) != 1: 

163 return None 

164 # no text, single img ⇒ this is a sticker 

165 # other cases should be interpreted as "custom emojis" in text 

166 src = images[0].get("src") 

167 if src is None: 

168 return None 

169 if src.startswith("cid:"): 

170 return src.removeprefix("cid:") 

171 return None 

172 

173 async def __get_replace( 

174 self, 

175 msg: Message, 

176 recipient: RecipientType, 

177 ) -> str | None: 

178 if "replace" not in msg or "id" not in msg["replace"]: 

179 return None 

180 return ( 

181 self._xmpp_msg_id_to_legacy(msg["replace"]["id"], recipient, True) or None 

182 ) 

183 

184 def __get_attachments(self, msg: Message) -> list[_IncomingAttachment]: 

185 is_sticker = "sticker" in msg 

186 

187 if ( 

188 "sfs" in msg 

189 and "sources" in msg["sfs"] 

190 and "url-data" in msg["sfs"]["sources"] 

191 and "target" in msg["sfs"]["sources"]["url-data"] 

192 ): 

193 # TODO: support "attach source in later message", cf https://xmpp.org/extensions/xep-0447.html#example-5 

194 # TODO: support for other sources than URL 

195 # TODO: support for multiattachments in single message. 

196 # What do we do if is_sticker and multiple files? 

197 attachment = _IncomingAttachment( 

198 XMPPAttachment(url=msg["sfs"]["sources"]["url-data"]["target"]), 

199 is_sticker=is_sticker, 

200 ) 

201 if "file" in msg["sfs"]: 

202 attachment.attachment.content_type = ( 

203 msg["sfs"]["file"]["media-type"] or None 

204 ) 

205 if "hash" in msg["sfs"]["file"]: 

206 algo = msg["sfs"]["file"]["hash"]["algo"] 

207 h = msg["sfs"]["file"]["hash"]["value"] 

208 attachment.cid = f"{algo}+{h}" if algo and h else None 

209 return [attachment] 

210 

211 if "oob" in msg: 

212 return [ 

213 _IncomingAttachment( 

214 XMPPAttachment(url=msg["oob"]["url"]), is_sticker=is_sticker 

215 ) 

216 ] 

217 

218 if ( 

219 "reference" in msg 

220 and "sims" in msg["reference"] 

221 and "sources" in msg["reference"]["sims"] 

222 ): 

223 for source in msg["reference"]["sims"]["sources"]["substanzas"]: 

224 if source["uri"].startswith("http"): 

225 attachment = _IncomingAttachment( 

226 XMPPAttachment(url=source["uri"]), is_sticker=is_sticker 

227 ) 

228 break 

229 else: 

230 return [] 

231 if "file" in msg["reference"]["sims"]: 

232 attachment.attachment.content_type = msg["media-type"] or None 

233 return [attachment] 

234 

235 return [] 

236 

237 async def __dispatch_msg( 

238 self, 

239 *, 

240 replace: str | None, 

241 reply: Reply | None, 

242 body: str, 

243 attachments: tuple[XMPPAttachment, ...], 

244 recipient: AnyRecipient, 

245 thread: str | None, 

246 msg: Message, 

247 ) -> str | None: 

248 if replace: 

249 body = body.strip() 

250 if not body and not attachments and recipient.RETRACTION: 

251 await recipient.on_retract(replace, thread=thread) 

252 return None 

253 if not recipient.CORRECTION: 

254 if recipient.RETRACTION: 

255 await recipient.on_retract(replace, thread=thread) 

256 replace = None 

257 elif body: 

258 body = "Correction:\n" + body 

259 

260 if not any([attachments, body]): 

261 log.debug( 

262 "Ignoring msg with id '%s' because no body or attachments found", 

263 msg.get_id(), 

264 ) 

265 return None 

266 link_previews = ( 

267 parse_link_previews(msg["link_metadatas"]) if "link_metadata" in msg else () 

268 ) 

269 mentions = ( 

270 await recipient.parse_mentions(body) 

271 if isinstance(recipient, LegacyMUC) 

272 else () 

273 ) 

274 return await recipient.on_message( 

275 XMPPMessage( 

276 body=body, 

277 attachments=attachments, 

278 reply=reply, 

279 thread=thread, 

280 link_previews=link_previews, 

281 mentions=mentions, 

282 replace=replace, 

283 ) 

284 ) 

285 

286 @exceptions_to_xmpp_errors 

287 async def on_message_retract(self, msg: Message) -> None: 

288 recipient, thread = await self._get_recipient_and_thread(msg) 

289 if not recipient.RETRACTION: 

290 raise XMPPError( 

291 "bad-request", 

292 "This legacy service does not support message retraction.", 

293 ) 

294 xmpp_id: str = msg["retract"]["id"] 

295 legacy_id = self._xmpp_msg_id_to_legacy(xmpp_id, recipient, origin=True) 

296 await recipient.on_retract(legacy_id, thread=thread) 

297 if isinstance(recipient, LegacyMUC): 

298 await recipient.echo(msg, None) 

299 self.__ack(msg) 

300 

301 @exceptions_to_xmpp_errors 

302 async def on_reactions(self, msg: Message) -> None: 

303 recipient, thread = await self._get_recipient_and_thread(msg) 

304 react_to: str = msg["reactions"]["id"] 

305 

306 special_msg = recipient.session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

307 recipient.session.SPECIAL_MSG_ID_PREFIX 

308 ) 

309 

310 if special_msg: 

311 legacy_id = react_to 

312 else: 

313 legacy_id = self._xmpp_msg_id_to_legacy(react_to, recipient) 

314 

315 if not legacy_id: 

316 log.debug("Ignored reaction from user") 

317 raise XMPPError( 

318 "internal-server-error", 

319 "Could not convert the XMPP msg ID to a legacy ID", 

320 ) 

321 

322 emojis = [ 

323 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

324 ] 

325 error_msg = None 

326 recipient = recipient 

327 

328 if not special_msg: 

329 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

330 error_msg = "Maximum 1 emoji/message" 

331 

332 if ( 

333 not error_msg 

334 and (subset := await recipient.available_emojis(legacy_id)) 

335 and not set(emojis).issubset(subset) 

336 ): 

337 error_msg = ( 

338 f"You can only react with the following emojis: {''.join(subset)}" 

339 ) 

340 

341 if error_msg: 

342 recipient.session.send_gateway_message(error_msg) 

343 if not isinstance(recipient, LegacyMUC): 

344 # no need to carbon for groups, we just don't echo the stanza 

345 recipient.react(legacy_id, carbon=True) 

346 await recipient.on_react(legacy_id, [], thread=thread) 

347 raise XMPPError( 

348 "policy-violation", 

349 text=error_msg, 

350 clear=False, 

351 ) 

352 

353 await recipient.on_react(legacy_id, emojis, thread=thread) 

354 if isinstance(recipient, LegacyMUC): 

355 await recipient.echo(msg, None) 

356 else: 

357 self.__ack(msg) 

358 

359 with self.xmpp.store.session() as orm: 

360 multi = self.xmpp.store.id_map.get_xmpp( 

361 orm, recipient.stored.id, legacy_id, recipient.is_group 

362 ) 

363 if not multi: 

364 return 

365 multi = [m for m in multi if react_to != m] 

366 

367 if isinstance(recipient, LegacyMUC): 

368 for xmpp_id in multi: 

369 mc = copy(msg) 

370 mc["reactions"]["id"] = xmpp_id 

371 await recipient.echo(mc) 

372 elif isinstance(recipient, LegacyContact): 

373 for xmpp_id in multi: 

374 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

375 

376 def __ack(self, msg: Message) -> None: 

377 if not self.xmpp.PROPER_RECEIPTS: 

378 self.xmpp.delivery_receipt.ack(msg) 

379 

380 async def __get_reply( 

381 self, msg: Message, recipient: AnyRecipient 

382 ) -> tuple[str, Reply | None]: 

383 if "reply" not in msg: 

384 return msg["body"], None 

385 

386 session = recipient.session 

387 

388 try: 

389 reply_to_msg_id = self._xmpp_msg_id_to_legacy(msg["reply"]["id"], recipient) 

390 except XMPPError: 

391 session.log.debug( 

392 "Could not determine reply-to legacy msg ID, sending quote instead." 

393 ) 

394 return redact_url(msg["body"]), None 

395 

396 reply_to_jid = JID(msg["reply"]["to"]) 

397 reply_to = None 

398 if msg["type"] == "chat": 

399 if reply_to_jid.bare != session.user_jid.bare: 

400 with contextlib.suppress(XMPPError): 

401 reply_to = await session.contacts.by_jid(reply_to_jid) 

402 elif msg["type"] == "groupchat": 

403 nick = reply_to_jid.resource 

404 try: 

405 muc = await session.bookmarks.by_jid(reply_to_jid) 

406 except XMPPError: 

407 pass 

408 else: 

409 if nick == muc.user_nick: 

410 reply_to = await muc.get_user_participant() 

411 elif not nick: 

412 reply_to = muc.get_system_participant() 

413 else: 

414 reply_to = await muc.get_participant(nick, store=False) 

415 

416 if "fallback" in msg and ( 

417 isinstance(recipient, LegacyMUC) or recipient.REPLIES 

418 ): 

419 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

420 try: 

421 reply_fallback = redact_url(msg["reply"].get_fallback_body()) 

422 except AttributeError: 

423 reply_fallback = None 

424 else: 

425 text = msg["body"] 

426 reply_fallback = None 

427 

428 return text, Reply(reply_to_msg_id, reply_to, reply_fallback) 

429 

430 async def __dispatch_nonbob_sticker( 

431 self, 

432 attachment: _IncomingAttachment, 

433 recipient: AnyRecipient, 

434 fallback: str, 

435 reply: Reply | None = None, 

436 thread: str | None = None, 

437 ) -> str | None: 

438 if attachment.cid: 

439 with self.xmpp.store.session() as orm: 

440 sticker = self.xmpp.store.bob.get_sticker(orm, attachment.cid) 

441 else: 

442 sticker = None 

443 if sticker is None: 

444 async with attachment.attachment.get() as response: 

445 response.raise_for_status() 

446 data = await response.read() 

447 cid = "sha256+" + hashlib.sha256(data).hexdigest() 

448 with self.xmpp.store.session() as orm: 

449 sticker = self.xmpp.store.bob.set_sticker( 

450 orm, cid, data, attachment.attachment.content_type 

451 ) 

452 orm.commit() 

453 if sticker is None: 

454 return await recipient.on_message( 

455 XMPPMessage( 

456 body="\n".join([attachment.attachment.url, fallback]), 

457 thread=thread, 

458 reply=reply, 

459 ) 

460 ) 

461 sticker.reply = reply 

462 sticker.thread = thread 

463 return await recipient.on_sticker(sticker) 

464 

465 async def __dispatch_bob( 

466 self, 

467 from_: JID, 

468 cid: str, 

469 recipient: AnyRecipient, 

470 reply: Reply | None = None, 

471 thread: str | None = None, 

472 ) -> str | None: 

473 with self.xmpp.store.session() as orm: 

474 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

475 if sticker is None: 

476 await self.xmpp.plugin["xep_0231"].get_bob( 

477 from_, cid, ifrom=self.xmpp.boundjid 

478 ) 

479 with self.xmpp.store.session() as orm: 

480 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

481 assert sticker is not None 

482 sticker.reply = reply 

483 sticker.thread = thread 

484 return await recipient.on_sticker(sticker) 

485 

486 

487def redact_url(text: str) -> str: 

488 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX 

489 if not needle: 

490 return text 

491 return text.replace(needle, "") 

492 

493 

494def parse_link_previews(link_metadatas: list[LinkMetadata]) -> tuple[LinkPreview, ...]: 

495 result = [] 

496 for link_metadata in link_metadatas: 

497 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type] 

498 if ( 

499 preview.image 

500 and isinstance(preview.image, str) 

501 and preview.image.startswith("data:image/jpeg;base64,") 

502 ): 

503 try: 

504 image = base64.b64decode( 

505 preview.image.removeprefix("data:image/jpeg;base64,") 

506 ) 

507 except Exception as e: 

508 log.warning( 

509 "Could not decode base64-encoded image: %s '%s'", e, preview.image 

510 ) 

511 else: 

512 preview = preview._replace(image=image) 

513 result.append(preview) 

514 return tuple(result) 

515 

516 

517log = logging.getLogger(__name__)