Coverage for slidge / core / dispatcher / message / message.py: 81%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import base64 

2import logging 

3from copy import copy 

4from typing import TYPE_CHECKING, Any 

5from xml.etree import ElementTree 

6 

7from slixmpp import JID, Message 

8from slixmpp.exceptions import XMPPError 

9from slixmpp.plugins.xep_0511.stanza import LinkMetadata 

10 

11from ....contact.contact import LegacyContact 

12from ....group.participant import LegacyParticipant 

13from ....group.room import LegacyMUC 

14from ....util.types import AnyContact, AnyRecipient, AnySession, LinkPreview 

15from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

16from ... import config 

17from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

18 

19if TYPE_CHECKING: 

20 from slidge.core.gateway import BaseGateway 

21 

22 

23class MessageContentMixin(DispatcherMixin): 

24 __slots__: list[str] = [] 

25 

26 def __init__(self, xmpp: "BaseGateway") -> None: 

27 super().__init__(xmpp) 

28 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

29 xmpp.add_event_handler("message_correction", self.on_message_correction) 

30 xmpp.add_event_handler("message_retract", self.on_message_retract) 

31 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

32 xmpp.add_event_handler("reactions", self.on_reactions) 

33 

34 async def on_groupchat_message(self, msg: Message) -> None: 

35 await self.on_legacy_message(msg) 

36 

37 @exceptions_to_xmpp_errors 

38 async def on_legacy_message(self, msg: Message) -> None: 

39 """ 

40 Meant to be called from :class:`BaseGateway` only. 

41 

42 :param msg: 

43 :return: 

44 """ 

45 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

46 # present. this is a problem for MUC echoed messages 

47 if msg.get_plugin("replace", check=True) is not None: 

48 # ignore last message correction (handled by a specific method) 

49 return 

50 if msg.get_plugin("apply_to", check=True) is not None: 

51 # ignore message retraction (handled by a specific method) 

52 return 

53 if msg.get_plugin("reactions", check=True) is not None: 

54 # ignore message reaction fallback. 

55 # the reaction itself is handled by self.react_from_msg(). 

56 return 

57 if msg.get_plugin("retract", check=True) is not None: 

58 # ignore message retraction fallback. 

59 # the retraction itself is handled by self.on_retract 

60 return 

61 if msg.xml.find(".//{*}encrypted") is not None: 

62 raise XMPPError( 

63 "bad-request", "You cannot send encrypted messages through this gateway" 

64 ) 

65 cid = None 

66 if msg.get_plugin("html", check=True) is not None: 

67 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

68 p = body.findall("p") 

69 if p is not None and len(p) == 1: 

70 if p[0].text is None or not p[0].text.strip(): 

71 images = p[0].findall("img") 

72 if len(images) == 1: 

73 # no text, single img ⇒ this is a sticker 

74 # other cases should be interpreted as "custom emojis" in text 

75 src = images[0].get("src") 

76 if src is not None and src.startswith("cid:"): 

77 cid = src.removeprefix("cid:") 

78 

79 session, recipient, thread = await self._get_session_recipient_thread(msg) 

80 

81 if msg.get_plugin("oob", check=True) is not None: 

82 url = msg["oob"]["url"] 

83 elif ( 

84 "reference" in msg 

85 and "sims" in msg["reference"] 

86 and "sources" in msg["reference"]["sims"] 

87 ): 

88 for source in msg["reference"]["sims"]["sources"]["substanzas"]: 

89 if source["uri"].startswith("http"): 

90 url = source["uri"] 

91 break 

92 else: 

93 url = None 

94 else: 

95 url = None 

96 

97 if msg.get_plugin("reply", check=True): 

98 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply( 

99 msg, session, recipient 

100 ) 

101 else: 

102 text = msg["body"] 

103 reply_to_msg_id = None 

104 reply_to = None 

105 reply_fallback = None 

106 

107 if "link_metadata" in msg: 

108 link_previews = parse_link_previews(msg["link_metadatas"]) 

109 else: 

110 link_previews = [] 

111 

112 if url: 

113 legacy_msg_id = await self.__send_url( 

114 url, 

115 session, 

116 recipient, 

117 reply_to_msg_id=reply_to_msg_id, 

118 reply_to_fallback_text=reply_fallback, 

119 reply_to=reply_to, 

120 thread=thread, 

121 ) 

122 elif cid: 

123 legacy_msg_id = await self.__send_bob( 

124 msg.get_from(), 

125 cid, 

126 session, 

127 recipient, 

128 reply_to_msg_id=reply_to_msg_id, 

129 reply_to_fallback_text=reply_fallback, 

130 reply_to=reply_to, 

131 thread=thread, 

132 ) 

133 elif text: 

134 if isinstance(recipient, LegacyMUC): 

135 mentions = {"mentions": await recipient.parse_mentions(text)} 

136 else: 

137 mentions = {} 

138 legacy_msg_id = await session.on_text( 

139 recipient, 

140 text, 

141 reply_to_msg_id=reply_to_msg_id, 

142 reply_to_fallback_text=reply_fallback, 

143 reply_to=reply_to, 

144 thread=thread, 

145 link_previews=link_previews, 

146 **mentions, 

147 ) 

148 else: 

149 log.debug("Ignoring %s", msg.get_id()) 

150 return 

151 

152 if isinstance(recipient, LegacyMUC): 

153 stanza_id = await recipient.echo(msg, legacy_msg_id) 

154 else: 

155 stanza_id = None 

156 self.__ack(msg) 

157 

158 if legacy_msg_id is None: 

159 return 

160 

161 with self.xmpp.store.session() as orm: 

162 if recipient.is_group: 

163 self.xmpp.store.id_map.set_origin( 

164 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id() 

165 ) 

166 assert stanza_id is not None 

167 self.xmpp.store.id_map.set_msg( 

168 orm, 

169 recipient.stored.id, 

170 str(legacy_msg_id), 

171 [stanza_id], 

172 True, 

173 ) 

174 else: 

175 self.xmpp.store.id_map.set_msg( 

176 orm, 

177 recipient.stored.id, 

178 str(legacy_msg_id), 

179 [msg.get_id()], 

180 False, 

181 ) 

182 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]): 

183 self.xmpp.store.id_map.set_thread( 

184 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group 

185 ) 

186 orm.commit() 

187 

188 @exceptions_to_xmpp_errors 

189 async def on_message_correction(self, msg: Message) -> None: 

190 if msg.get_plugin("retract", check=True) is not None: 

191 # ignore message retraction fallback (fallback=last msg correction) 

192 return 

193 session, recipient, thread = await self._get_session_recipient_thread(msg) 

194 legacy_id = self._xmpp_msg_id_to_legacy( 

195 session, msg["replace"]["id"], recipient, True 

196 ) 

197 

198 if isinstance(recipient, LegacyMUC): 

199 mentions = await recipient.parse_mentions(msg["body"]) 

200 else: 

201 mentions = None 

202 

203 if "link_metadata" in msg: 

204 link_previews = parse_link_previews(msg["link_metadatas"]) 

205 else: 

206 link_previews = [] 

207 

208 if legacy_id is None: 

209 log.debug("Did not find legacy ID to correct") 

210 new_legacy_msg_id = await session.on_text( 

211 recipient, 

212 "Correction:" + msg["body"], 

213 thread=thread, 

214 mentions=mentions, 

215 link_previews=link_previews, 

216 ) 

217 elif not msg["body"].strip() and recipient.RETRACTION: 

218 await session.on_retract(recipient, legacy_id, thread=thread) 

219 new_legacy_msg_id = None 

220 elif recipient.CORRECTION: 

221 new_legacy_msg_id = await session.on_correct( 

222 recipient, 

223 msg["body"], 

224 legacy_id, 

225 thread=thread, 

226 mentions=mentions, 

227 link_previews=link_previews, 

228 ) 

229 else: 

230 session.send_gateway_message( 

231 "Last message correction is not supported by this legacy service. " 

232 "Slidge will send your correction as new message." 

233 ) 

234 if recipient.RETRACTION and legacy_id is not None: 

235 if legacy_id is not None: 

236 session.send_gateway_message( 

237 "Slidge will attempt to retract the original message you wanted" 

238 " to edit." 

239 ) 

240 await session.on_retract(recipient, legacy_id, thread=thread) 

241 

242 new_legacy_msg_id = await session.on_text( 

243 recipient, 

244 "Correction: " + msg["body"], 

245 thread=thread, 

246 mentions=mentions, 

247 link_previews=link_previews, 

248 ) 

249 

250 if isinstance(recipient, LegacyMUC): 

251 await recipient.echo(msg, new_legacy_msg_id) 

252 else: 

253 self.__ack(msg) 

254 if new_legacy_msg_id is None: 

255 return 

256 with self.xmpp.store.session() as orm: 

257 self.xmpp.store.id_map.set_msg( 

258 orm, 

259 recipient.stored.id, 

260 new_legacy_msg_id, 

261 [msg.get_id()], 

262 recipient.is_group, 

263 ) 

264 orm.commit() 

265 

266 @exceptions_to_xmpp_errors 

267 async def on_message_retract(self, msg: Message) -> None: 

268 session, recipient, thread = await self._get_session_recipient_thread(msg) 

269 if not recipient.RETRACTION: 

270 raise XMPPError( 

271 "bad-request", 

272 "This legacy service does not support message retraction.", 

273 ) 

274 xmpp_id: str = msg["retract"]["id"] 

275 legacy_id = self._xmpp_msg_id_to_legacy( 

276 session, xmpp_id, recipient, origin=True 

277 ) 

278 await session.on_retract(recipient, legacy_id, thread=thread) 

279 if isinstance(recipient, LegacyMUC): 

280 await recipient.echo(msg, None) 

281 self.__ack(msg) 

282 

283 @exceptions_to_xmpp_errors 

284 async def on_reactions(self, msg: Message) -> None: 

285 session, recipient, thread = await self._get_session_recipient_thread(msg) 

286 react_to: str = msg["reactions"]["id"] 

287 

288 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

289 session.SPECIAL_MSG_ID_PREFIX 

290 ) 

291 

292 if special_msg: 

293 legacy_id = react_to 

294 else: 

295 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient) 

296 

297 if not legacy_id: 

298 log.debug("Ignored reaction from user") 

299 raise XMPPError( 

300 "internal-server-error", 

301 "Could not convert the XMPP msg ID to a legacy ID", 

302 ) 

303 

304 emojis = [ 

305 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

306 ] 

307 error_msg = None 

308 recipient = recipient 

309 

310 if not special_msg: 

311 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

312 error_msg = "Maximum 1 emoji/message" 

313 

314 if not error_msg and ( 

315 subset := await recipient.available_emojis(legacy_id) 

316 ): 

317 if not set(emojis).issubset(subset): 

318 error_msg = f"You can only react with the following emojis: {''.join(subset)}" 

319 

320 if error_msg: 

321 session.send_gateway_message(error_msg) 

322 if not isinstance(recipient, LegacyMUC): 

323 # no need to carbon for groups, we just don't echo the stanza 

324 recipient.react(legacy_id, carbon=True) 

325 await session.on_react(recipient, legacy_id, [], thread=thread) 

326 raise XMPPError( 

327 "policy-violation", 

328 text=error_msg, 

329 clear=False, 

330 ) 

331 

332 await session.on_react(recipient, legacy_id, emojis, thread=thread) 

333 if isinstance(recipient, LegacyMUC): 

334 await recipient.echo(msg, None) 

335 else: 

336 self.__ack(msg) 

337 

338 with self.xmpp.store.session() as orm: 

339 multi = self.xmpp.store.id_map.get_xmpp( 

340 orm, recipient.stored.id, legacy_id, recipient.is_group 

341 ) 

342 if not multi: 

343 return 

344 multi = [m for m in multi if react_to != m] 

345 

346 if isinstance(recipient, LegacyMUC): 

347 for xmpp_id in multi: 

348 mc = copy(msg) 

349 mc["reactions"]["id"] = xmpp_id 

350 await recipient.echo(mc) 

351 elif isinstance(recipient, LegacyContact): 

352 for xmpp_id in multi: 

353 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

354 

355 def __ack(self, msg: Message) -> None: 

356 if not self.xmpp.PROPER_RECEIPTS: 

357 self.xmpp.delivery_receipt.ack(msg) 

358 

359 async def __get_reply( 

360 self, msg: Message, session: AnySession, recipient: AnyRecipient 

361 ) -> tuple[ 

362 str, str | int | None, AnyContact | LegacyParticipant | None, str | None 

363 ]: 

364 try: 

365 reply_to_msg_id = self._xmpp_msg_id_to_legacy( 

366 session, msg["reply"]["id"], recipient 

367 ) 

368 except XMPPError: 

369 session.log.debug( 

370 "Could not determine reply-to legacy msg ID, sending quote instead." 

371 ) 

372 return redact_url(msg["body"]), None, None, None 

373 

374 reply_to_jid = JID(msg["reply"]["to"]) 

375 reply_to = None 

376 if msg["type"] == "chat": 

377 if reply_to_jid.bare != session.user_jid.bare: 

378 try: 

379 reply_to = await session.contacts.by_jid(reply_to_jid) 

380 except XMPPError: 

381 pass 

382 elif msg["type"] == "groupchat": 

383 nick = reply_to_jid.resource 

384 try: 

385 muc = await session.bookmarks.by_jid(reply_to_jid) 

386 except XMPPError: 

387 pass 

388 else: 

389 if nick == muc.user_nick: 

390 reply_to = await muc.get_user_participant() 

391 elif not nick: 

392 reply_to = muc.get_system_participant() 

393 else: 

394 reply_to = await muc.get_participant(nick, store=False) 

395 

396 if msg.get_plugin("fallback", check=True) and ( 

397 isinstance(recipient, LegacyMUC) or recipient.REPLIES 

398 ): 

399 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

400 try: 

401 reply_fallback = redact_url(msg["reply"].get_fallback_body()) 

402 except AttributeError: 

403 reply_fallback = None 

404 else: 

405 text = msg["body"] 

406 reply_fallback = None 

407 

408 return text, reply_to_msg_id, reply_to, reply_fallback 

409 

410 async def __send_url( 

411 self, 

412 url: str, 

413 session: AnySession, 

414 recipient: AnyRecipient, 

415 **kwargs: Any, # noqa:ANN401 

416 ) -> int | str | None: 

417 async with self.xmpp.http.get(url) as response: 

418 if response.status >= 400: 

419 session.log.warning( 

420 "OOB url cannot be downloaded: %s, sending the URL as text" 

421 " instead.", 

422 response, 

423 ) 

424 return await session.on_text(recipient, url, **kwargs) 

425 

426 return await session.on_file( 

427 recipient, url, http_response=response, **kwargs 

428 ) 

429 

430 async def __send_bob( 

431 self, 

432 from_: JID, 

433 cid: str, 

434 session: AnySession, 

435 recipient: AnyRecipient, 

436 **kwargs: Any, # noqa:ANN401 

437 ) -> int | str | None: 

438 with self.xmpp.store.session() as orm: 

439 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

440 if sticker is None: 

441 await self.xmpp.plugin["xep_0231"].get_bob( 

442 from_, cid, ifrom=self.xmpp.boundjid 

443 ) 

444 with self.xmpp.store.session() as orm: 

445 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

446 assert sticker is not None 

447 return await session.on_sticker(recipient, sticker, **kwargs) 

448 

449 

450def redact_url(text: str) -> str: 

451 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX 

452 if not needle: 

453 return text 

454 return text.replace(needle, "") 

455 

456 

457def parse_link_previews(link_metadatas: list[LinkMetadata]) -> list[LinkPreview]: 

458 result = [] 

459 for link_metadata in link_metadatas: 

460 preview: LinkPreview = dict_to_named_tuple(link_metadata, LinkPreview) # type:ignore[arg-type] 

461 if ( 

462 preview.image 

463 and isinstance(preview.image, str) 

464 and preview.image.startswith("data:image/jpeg;base64,") 

465 ): 

466 try: 

467 image = base64.b64decode( 

468 preview.image.removeprefix("data:image/jpeg;base64,") 

469 ) 

470 except Exception as e: 

471 log.warning( 

472 "Could not decode base64-encoded image: %s '%s'", e, preview.image 

473 ) 

474 else: 

475 preview = preview._replace(image=image) 

476 result.append(preview) 

477 return result 

478 

479 

480log = logging.getLogger(__name__)