Coverage for slidge / core / dispatcher / message / message.py: 84%

278 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1import base64 

2import contextlib 

3import hashlib 

4import logging 

5from copy import copy 

6from typing import TYPE_CHECKING, Any 

7from xml.etree import ElementTree 

8 

9from aiohttp import ClientResponse 

10from slixmpp import JID, Message 

11from slixmpp.exceptions import XMPPError 

12from slixmpp.plugins.xep_0511.stanza import LinkMetadata 

13 

14from ....contact.contact import LegacyContact 

15from ....group.participant import LegacyParticipant 

16from ....group.room import LegacyMUC 

17from ....util.types import AnyContact, AnyRecipient, AnySession, LinkPreview 

18from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

19from ... import config 

20from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

21 

22if TYPE_CHECKING: 

23 from slidge.util.types import AnyGateway 

24 

25 

26class MessageContentMixin(DispatcherMixin): 

27 __slots__: list[str] = [] 

28 

29 def __init__(self, xmpp: "AnyGateway") -> None: 

30 super().__init__(xmpp) 

31 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

32 xmpp.add_event_handler("message_correction", self.on_message_correction) 

33 xmpp.add_event_handler("message_retract", self.on_message_retract) 

34 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

35 xmpp.add_event_handler("reactions", self.on_reactions) 

36 

37 async def on_groupchat_message(self, msg: Message) -> None: 

38 await self.on_legacy_message(msg) 

39 

40 @exceptions_to_xmpp_errors 

41 async def on_legacy_message(self, msg: Message) -> None: 

42 """ 

43 Meant to be called from :class:`BaseGateway` only. 

44 

45 :param msg: 

46 :return: 

47 """ 

48 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

49 # present. this is a problem for MUC echoed messages 

50 if "replace" in msg: 

51 # ignore last message correction (handled by a specific method) 

52 return 

53 if "apply_to" in msg: 

54 # ignore message retraction (handled by a specific method) 

55 return 

56 if "reactions" in msg: 

57 # ignore message reaction fallback. 

58 # the reaction itself is handled by self.react_from_msg(). 

59 return 

60 if "retract" in msg: 

61 # ignore message retraction fallback. 

62 # the retraction itself is handled by self.on_retract 

63 return 

64 if msg.xml.find(".//{*}encrypted") is not None: 

65 raise XMPPError( 

66 "bad-request", "You cannot send encrypted messages through this gateway" 

67 ) 

68 cid = None 

69 if "html" in msg: 

70 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

71 p = body.findall("p") 

72 if ( 

73 p is not None 

74 and len(p) == 1 

75 and (p[0].text is None or not p[0].text.strip()) 

76 ): 

77 images = p[0].findall("img") 

78 if len(images) == 1: 

79 # no text, single img ⇒ this is a sticker 

80 # other cases should be interpreted as "custom emojis" in text 

81 src = images[0].get("src") 

82 if src is not None and src.startswith("cid:"): 

83 cid = src.removeprefix("cid:") 

84 

85 session, recipient, thread = await self._get_session_recipient_thread(msg) 

86 

87 if ( 

88 "sfs" in msg 

89 and "sources" in msg["sfs"] 

90 and "url-data" in msg["sfs"]["sources"] 

91 and "target" in msg["sfs"]["sources"]["url-data"] 

92 ): 

93 # TODO: support "attach source in later message", cf https://xmpp.org/extensions/xep-0447.html#example-5 

94 # TODO: support for other sources than URL 

95 url = msg["sfs"]["sources"]["url-data"]["target"] 

96 if "file" in msg["sfs"]: 

97 content_type = msg["sfs"]["file"]["media-type"] or None 

98 if "hash" in msg["sfs"]["file"]: 

99 algo = msg["sfs"]["file"]["hash"]["algo"] 

100 h = msg["sfs"]["file"]["hash"]["value"] 

101 cid = f"{algo}+{h}" if algo and h else None 

102 elif "oob" in msg: 

103 url = msg["oob"]["url"] 

104 content_type = None 

105 elif ( 

106 "reference" in msg 

107 and "sims" in msg["reference"] 

108 and "sources" in msg["reference"]["sims"] 

109 ): 

110 for source in msg["reference"]["sims"]["sources"]["substanzas"]: 

111 if source["uri"].startswith("http"): 

112 url = source["uri"] 

113 break 

114 else: 

115 url = None 

116 if "file" in msg["reference"]["sims"]: 

117 content_type = msg["media-type"] or None 

118 else: 

119 url = None 

120 content_type = None 

121 

122 if "reply" in msg: 

123 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply( 

124 msg, session, recipient 

125 ) 

126 else: 

127 text = msg["body"] 

128 reply_to_msg_id = None 

129 reply_to = None 

130 reply_fallback = None 

131 

132 if "link_metadata" in msg: 

133 link_previews = parse_link_previews(msg["link_metadatas"]) 

134 else: 

135 link_previews = [] 

136 

137 if url: 

138 legacy_msg_id = await self.__send_url( 

139 url, 

140 session, 

141 recipient, 

142 fallback=msg["body"], 

143 cid=cid, 

144 is_sticker="sticker" in msg, 

145 content_type=content_type, 

146 reply_to_msg_id=reply_to_msg_id, 

147 reply_to_fallback_text=reply_fallback, 

148 reply_to=reply_to, 

149 thread=thread, 

150 ) 

151 elif cid: 

152 legacy_msg_id = await self.__send_bob( 

153 msg.get_from(), 

154 cid, 

155 session, 

156 recipient, 

157 reply_to_msg_id=reply_to_msg_id, 

158 reply_to_fallback_text=reply_fallback, 

159 reply_to=reply_to, 

160 thread=thread, 

161 ) 

162 elif text: 

163 if isinstance(recipient, LegacyMUC): 

164 mentions = {"mentions": await recipient.parse_mentions(text)} 

165 else: 

166 mentions = {} 

167 legacy_msg_id = await session.on_text( 

168 recipient, 

169 text, 

170 reply_to_msg_id=reply_to_msg_id, 

171 reply_to_fallback_text=reply_fallback, 

172 reply_to=reply_to, 

173 thread=thread, 

174 link_previews=link_previews, 

175 **mentions, 

176 ) 

177 else: 

178 log.debug("Ignoring %s", msg.get_id()) 

179 return 

180 

181 if isinstance(recipient, LegacyMUC): 

182 stanza_id = await recipient.echo(msg, legacy_msg_id) 

183 else: 

184 stanza_id = None 

185 self.__ack(msg) 

186 

187 if legacy_msg_id is None: 

188 return 

189 

190 with self.xmpp.store.session() as orm: 

191 if recipient.is_group: 

192 self.xmpp.store.id_map.set_origin( 

193 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id() 

194 ) 

195 assert stanza_id is not None 

196 self.xmpp.store.id_map.set_msg( 

197 orm, 

198 recipient.stored.id, 

199 str(legacy_msg_id), 

200 [stanza_id], 

201 True, 

202 ) 

203 else: 

204 self.xmpp.store.id_map.set_msg( 

205 orm, 

206 recipient.stored.id, 

207 str(legacy_msg_id), 

208 [msg.get_id()], 

209 False, 

210 ) 

211 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]): 

212 self.xmpp.store.id_map.set_thread( 

213 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group 

214 ) 

215 orm.commit() 

216 

217 @exceptions_to_xmpp_errors 

218 async def on_message_correction(self, msg: Message) -> None: 

219 if "retract" in msg: 

220 # ignore message retraction fallback (fallback=last msg correction) 

221 return 

222 session, recipient, thread = await self._get_session_recipient_thread(msg) 

223 legacy_id = self._xmpp_msg_id_to_legacy( 

224 session, msg["replace"]["id"], recipient, True 

225 ) 

226 

227 if isinstance(recipient, LegacyMUC): 

228 mentions = await recipient.parse_mentions(msg["body"]) 

229 else: 

230 mentions = None 

231 

232 if "link_metadata" in msg: 

233 link_previews = parse_link_previews(msg["link_metadatas"]) 

234 else: 

235 link_previews = [] 

236 

237 if legacy_id is None: 

238 log.debug("Did not find legacy ID to correct") 

239 new_legacy_msg_id = await session.on_text( 

240 recipient, 

241 "Correction:" + msg["body"], 

242 thread=thread, 

243 mentions=mentions, 

244 link_previews=link_previews, 

245 ) 

246 elif not msg["body"].strip() and recipient.RETRACTION: 

247 await session.on_retract(recipient, legacy_id, thread=thread) 

248 new_legacy_msg_id = None 

249 elif recipient.CORRECTION: 

250 new_legacy_msg_id = await session.on_correct( 

251 recipient, 

252 msg["body"], 

253 legacy_id, 

254 thread=thread, 

255 mentions=mentions, 

256 link_previews=link_previews, 

257 ) 

258 else: 

259 session.send_gateway_message( 

260 "Last message correction is not supported by this legacy service. " 

261 "Slidge will send your correction as new message." 

262 ) 

263 if recipient.RETRACTION and legacy_id is not None: 

264 session.send_gateway_message( 

265 "Slidge will attempt to retract the original message you wanted" 

266 " to edit." 

267 ) 

268 await session.on_retract(recipient, legacy_id, thread=thread) 

269 

270 new_legacy_msg_id = await session.on_text( 

271 recipient, 

272 "Correction: " + msg["body"], 

273 thread=thread, 

274 mentions=mentions, 

275 link_previews=link_previews, 

276 ) 

277 

278 if isinstance(recipient, LegacyMUC): 

279 await recipient.echo(msg, new_legacy_msg_id) 

280 else: 

281 self.__ack(msg) 

282 if new_legacy_msg_id is None: 

283 return 

284 with self.xmpp.store.session() as orm: 

285 self.xmpp.store.id_map.set_msg( 

286 orm, 

287 recipient.stored.id, 

288 new_legacy_msg_id, 

289 [msg.get_id()], 

290 recipient.is_group, 

291 ) 

292 orm.commit() 

293 

294 @exceptions_to_xmpp_errors 

295 async def on_message_retract(self, msg: Message) -> None: 

296 session, recipient, thread = await self._get_session_recipient_thread(msg) 

297 if not recipient.RETRACTION: 

298 raise XMPPError( 

299 "bad-request", 

300 "This legacy service does not support message retraction.", 

301 ) 

302 xmpp_id: str = msg["retract"]["id"] 

303 legacy_id = self._xmpp_msg_id_to_legacy( 

304 session, xmpp_id, recipient, origin=True 

305 ) 

306 await session.on_retract(recipient, legacy_id, thread=thread) 

307 if isinstance(recipient, LegacyMUC): 

308 await recipient.echo(msg, None) 

309 self.__ack(msg) 

310 

311 @exceptions_to_xmpp_errors 

312 async def on_reactions(self, msg: Message) -> None: 

313 session, recipient, thread = await self._get_session_recipient_thread(msg) 

314 react_to: str = msg["reactions"]["id"] 

315 

316 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

317 session.SPECIAL_MSG_ID_PREFIX 

318 ) 

319 

320 if special_msg: 

321 legacy_id = react_to 

322 else: 

323 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient) 

324 

325 if not legacy_id: 

326 log.debug("Ignored reaction from user") 

327 raise XMPPError( 

328 "internal-server-error", 

329 "Could not convert the XMPP msg ID to a legacy ID", 

330 ) 

331 

332 emojis = [ 

333 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

334 ] 

335 error_msg = None 

336 recipient = recipient 

337 

338 if not special_msg: 

339 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

340 error_msg = "Maximum 1 emoji/message" 

341 

342 if ( 

343 not error_msg 

344 and (subset := await recipient.available_emojis(legacy_id)) 

345 and not set(emojis).issubset(subset) 

346 ): 

347 error_msg = ( 

348 f"You can only react with the following emojis: {''.join(subset)}" 

349 ) 

350 

351 if error_msg: 

352 session.send_gateway_message(error_msg) 

353 if not isinstance(recipient, LegacyMUC): 

354 # no need to carbon for groups, we just don't echo the stanza 

355 recipient.react(legacy_id, carbon=True) 

356 await session.on_react(recipient, legacy_id, [], thread=thread) 

357 raise XMPPError( 

358 "policy-violation", 

359 text=error_msg, 

360 clear=False, 

361 ) 

362 

363 await session.on_react(recipient, legacy_id, emojis, thread=thread) 

364 if isinstance(recipient, LegacyMUC): 

365 await recipient.echo(msg, None) 

366 else: 

367 self.__ack(msg) 

368 

369 with self.xmpp.store.session() as orm: 

370 multi = self.xmpp.store.id_map.get_xmpp( 

371 orm, recipient.stored.id, legacy_id, recipient.is_group 

372 ) 

373 if not multi: 

374 return 

375 multi = [m for m in multi if react_to != m] 

376 

377 if isinstance(recipient, LegacyMUC): 

378 for xmpp_id in multi: 

379 mc = copy(msg) 

380 mc["reactions"]["id"] = xmpp_id 

381 await recipient.echo(mc) 

382 elif isinstance(recipient, LegacyContact): 

383 for xmpp_id in multi: 

384 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

385 

386 def __ack(self, msg: Message) -> None: 

387 if not self.xmpp.PROPER_RECEIPTS: 

388 self.xmpp.delivery_receipt.ack(msg) 

389 

390 async def __get_reply( 

391 self, msg: Message, session: AnySession, recipient: AnyRecipient 

392 ) -> tuple[ 

393 str, str | int | None, AnyContact | LegacyParticipant | None, str | None 

394 ]: 

395 try: 

396 reply_to_msg_id = self._xmpp_msg_id_to_legacy( 

397 session, msg["reply"]["id"], recipient 

398 ) 

399 except XMPPError: 

400 session.log.debug( 

401 "Could not determine reply-to legacy msg ID, sending quote instead." 

402 ) 

403 return redact_url(msg["body"]), None, None, None 

404 

405 reply_to_jid = JID(msg["reply"]["to"]) 

406 reply_to = None 

407 if msg["type"] == "chat": 

408 if reply_to_jid.bare != session.user_jid.bare: 

409 with contextlib.suppress(XMPPError): 

410 reply_to = await session.contacts.by_jid(reply_to_jid) 

411 elif msg["type"] == "groupchat": 

412 nick = reply_to_jid.resource 

413 try: 

414 muc = await session.bookmarks.by_jid(reply_to_jid) 

415 except XMPPError: 

416 pass 

417 else: 

418 if nick == muc.user_nick: 

419 reply_to = await muc.get_user_participant() 

420 elif not nick: 

421 reply_to = muc.get_system_participant() 

422 else: 

423 reply_to = await muc.get_participant(nick, store=False) 

424 

425 if "fallback" in msg and ( 

426 isinstance(recipient, LegacyMUC) or recipient.REPLIES 

427 ): 

428 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

429 try: 

430 reply_fallback = redact_url(msg["reply"].get_fallback_body()) 

431 except AttributeError: 

432 reply_fallback = None 

433 else: 

434 text = msg["body"] 

435 reply_fallback = None 

436 

437 return text, reply_to_msg_id, reply_to, reply_fallback 

438 

439 async def __send_url( 

440 self, 

441 url: str, 

442 session: AnySession, 

443 recipient: AnyRecipient, 

444 is_sticker: bool, 

445 cid: str | None, 

446 content_type: str | None, 

447 fallback: str, 

448 **kwargs: Any, # noqa:ANN401 

449 ) -> int | str | None: 

450 async with self.xmpp.http.get(url) as response: 

451 if response.status >= 400: 

452 session.log.warning( 

453 "OOB url cannot be downloaded: %s, sending the URL as text" 

454 " instead.", 

455 response, 

456 ) 

457 return await session.on_text( 

458 recipient, "\n".join([url, fallback]), **kwargs 

459 ) 

460 if is_sticker: 

461 return await self.__send_nonbob_sticker( 

462 response, 

463 url, 

464 session, 

465 recipient, 

466 cid, 

467 content_type, 

468 fallback, 

469 **kwargs, 

470 ) 

471 return await session.on_file( 

472 recipient, url, http_response=response, **kwargs 

473 ) 

474 

475 async def __send_nonbob_sticker( 

476 self, 

477 response: ClientResponse, 

478 url: str, 

479 session: AnySession, 

480 recipient: AnyRecipient, 

481 cid: str | None, 

482 content_type: str | None, 

483 fallback: str, 

484 **kwargs: Any, # noqa:ANN401 

485 ) -> int | str | None: 

486 if cid: 

487 with self.xmpp.store.session() as orm: 

488 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

489 else: 

490 sticker = None 

491 if sticker is None: 

492 data = await response.read() 

493 cid = "sha256+" + hashlib.sha256(data).hexdigest() 

494 with self.xmpp.store.session() as orm: 

495 sticker = self.xmpp.store.bob.set_sticker(orm, cid, data, content_type) 

496 orm.commit() 

497 if sticker is None: 

498 return await session.on_text( 

499 recipient, "\n".join([url, fallback]), **kwargs 

500 ) 

501 return await session.on_sticker(recipient, sticker, **kwargs) 

502 

503 async def __send_bob( 

504 self, 

505 from_: JID, 

506 cid: str, 

507 session: AnySession, 

508 recipient: AnyRecipient, 

509 **kwargs: Any, # noqa:ANN401 

510 ) -> int | str | None: 

511 with self.xmpp.store.session() as orm: 

512 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

513 if sticker is None: 

514 await self.xmpp.plugin["xep_0231"].get_bob( 

515 from_, cid, ifrom=self.xmpp.boundjid 

516 ) 

517 with self.xmpp.store.session() as orm: 

518 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

519 assert sticker is not None 

520 return await session.on_sticker(recipient, sticker, **kwargs) 

521 

522 

523def redact_url(text: str) -> str: 

524 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX 

525 if not needle: 

526 return text 

527 return text.replace(needle, "") 

528 

529 

530def parse_link_previews(link_metadatas: list[LinkMetadata]) -> list[LinkPreview]: 

531 result = [] 

532 for link_metadata in link_metadatas: 

533 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type] 

534 if ( 

535 preview.image 

536 and isinstance(preview.image, str) 

537 and preview.image.startswith("data:image/jpeg;base64,") 

538 ): 

539 try: 

540 image = base64.b64decode( 

541 preview.image.removeprefix("data:image/jpeg;base64,") 

542 ) 

543 except Exception as e: 

544 log.warning( 

545 "Could not decode base64-encoded image: %s '%s'", e, preview.image 

546 ) 

547 else: 

548 preview = preview._replace(image=image) 

549 result.append(preview) 

550 return result 

551 

552 

553log = logging.getLogger(__name__)