Coverage for slidge / core / dispatcher / message / message.py: 81%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1import logging 

2from copy import copy 

3from xml.etree import ElementTree 

4 

5from slixmpp import JID, Message 

6from slixmpp.exceptions import XMPPError 

7 

8from ....contact.contact import LegacyContact 

9from ....group.participant import LegacyParticipant 

10from ....group.room import LegacyMUC 

11from ....util.types import LinkPreview, Recipient 

12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

13from ... import config 

14from ...session import BaseSession 

15from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

16 

17 

18class MessageContentMixin(DispatcherMixin): 

19 __slots__: list[str] = [] 

20 

21 def __init__(self, xmpp) -> None: 

22 super().__init__(xmpp) 

23 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

24 xmpp.add_event_handler("message_correction", self.on_message_correction) 

25 xmpp.add_event_handler("message_retract", self.on_message_retract) 

26 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

27 xmpp.add_event_handler("reactions", self.on_reactions) 

28 

29 async def on_groupchat_message(self, msg: Message) -> None: 

30 await self.on_legacy_message(msg) 

31 

32 @exceptions_to_xmpp_errors 

33 async def on_legacy_message(self, msg: Message) -> None: 

34 """ 

35 Meant to be called from :class:`BaseGateway` only. 

36 

37 :param msg: 

38 :return: 

39 """ 

40 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

41 # present. this is a problem for MUC echoed messages 

42 if msg.get_plugin("replace", check=True) is not None: 

43 # ignore last message correction (handled by a specific method) 

44 return 

45 if msg.get_plugin("apply_to", check=True) is not None: 

46 # ignore message retraction (handled by a specific method) 

47 return 

48 if msg.get_plugin("reactions", check=True) is not None: 

49 # ignore message reaction fallback. 

50 # the reaction itself is handled by self.react_from_msg(). 

51 return 

52 if msg.get_plugin("retract", check=True) is not None: 

53 # ignore message retraction fallback. 

54 # the retraction itself is handled by self.on_retract 

55 return 

56 if msg.xml.find(".//{*}encrypted") is not None: 

57 raise XMPPError( 

58 "bad-request", "You cannot send encrypted messages through this gateway" 

59 ) 

60 cid = None 

61 if msg.get_plugin("html", check=True) is not None: 

62 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

63 p = body.findall("p") 

64 if p is not None and len(p) == 1: 

65 if p[0].text is None or not p[0].text.strip(): 

66 images = p[0].findall("img") 

67 if len(images) == 1: 

68 # no text, single img ⇒ this is a sticker 

69 # other cases should be interpreted as "custom emojis" in text 

70 src = images[0].get("src") 

71 if src is not None and src.startswith("cid:"): 

72 cid = src.removeprefix("cid:") 

73 

74 session, recipient, thread = await self._get_session_recipient_thread(msg) 

75 

76 if msg.get_plugin("oob", check=True) is not None: 

77 url = msg["oob"]["url"] 

78 elif ( 

79 "reference" in msg 

80 and "sims" in msg["reference"] 

81 and "sources" in msg["reference"]["sims"] 

82 ): 

83 for source in msg["reference"]["sims"]["sources"]["substanzas"]: 

84 if source["uri"].startswith("http"): 

85 url = source["uri"] 

86 break 

87 else: 

88 url = None 

89 else: 

90 url = None 

91 

92 if msg.get_plugin("reply", check=True): 

93 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply( 

94 msg, session, recipient 

95 ) 

96 else: 

97 text = msg["body"] 

98 reply_to_msg_id = None 

99 reply_to = None 

100 reply_fallback = None 

101 

102 if msg.get_plugin("link_previews", check=True): 

103 link_previews = [ 

104 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"] 

105 ] 

106 else: 

107 link_previews = [] 

108 

109 if url: 

110 legacy_msg_id = await self.__send_url( 

111 url, 

112 session, 

113 recipient, 

114 reply_to_msg_id=reply_to_msg_id, 

115 reply_to_fallback_text=reply_fallback, 

116 reply_to=reply_to, 

117 thread=thread, 

118 ) 

119 elif cid: 

120 legacy_msg_id = await self.__send_bob( 

121 msg.get_from(), 

122 cid, 

123 session, 

124 recipient, 

125 reply_to_msg_id=reply_to_msg_id, 

126 reply_to_fallback_text=reply_fallback, 

127 reply_to=reply_to, 

128 thread=thread, 

129 ) 

130 elif text: 

131 if isinstance(recipient, LegacyMUC): 

132 mentions = {"mentions": await recipient.parse_mentions(text)} 

133 else: 

134 mentions = {} 

135 legacy_msg_id = await session.on_text( 

136 recipient, 

137 text, 

138 reply_to_msg_id=reply_to_msg_id, 

139 reply_to_fallback_text=reply_fallback, 

140 reply_to=reply_to, 

141 thread=thread, 

142 link_previews=link_previews, 

143 **mentions, 

144 ) 

145 else: 

146 log.debug("Ignoring %s", msg.get_id()) 

147 return 

148 

149 if isinstance(recipient, LegacyMUC): 

150 stanza_id = await recipient.echo(msg, legacy_msg_id) 

151 else: 

152 stanza_id = None 

153 self.__ack(msg) 

154 

155 if legacy_msg_id is None: 

156 return 

157 

158 with self.xmpp.store.session() as orm: 

159 if recipient.is_group: 

160 self.xmpp.store.id_map.set_origin( 

161 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id() 

162 ) 

163 assert stanza_id is not None 

164 self.xmpp.store.id_map.set_msg( 

165 orm, 

166 recipient.stored.id, 

167 str(legacy_msg_id), 

168 [stanza_id], 

169 True, 

170 ) 

171 else: 

172 self.xmpp.store.id_map.set_msg( 

173 orm, 

174 recipient.stored.id, 

175 str(legacy_msg_id), 

176 [msg.get_id()], 

177 False, 

178 ) 

179 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]): 

180 self.xmpp.store.id_map.set_thread( 

181 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group 

182 ) 

183 orm.commit() 

184 

185 @exceptions_to_xmpp_errors 

186 async def on_message_correction(self, msg: Message) -> None: 

187 if msg.get_plugin("retract", check=True) is not None: 

188 # ignore message retraction fallback (fallback=last msg correction) 

189 return 

190 session, recipient, thread = await self._get_session_recipient_thread(msg) 

191 legacy_id = self._xmpp_msg_id_to_legacy( 

192 session, msg["replace"]["id"], recipient, True 

193 ) 

194 

195 if isinstance(recipient, LegacyMUC): 

196 mentions = await recipient.parse_mentions(msg["body"]) 

197 else: 

198 mentions = None 

199 

200 if previews := msg["link_previews"]: 

201 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews] 

202 else: 

203 link_previews = [] 

204 

205 if legacy_id is None: 

206 log.debug("Did not find legacy ID to correct") 

207 new_legacy_msg_id = await session.on_text( 

208 recipient, 

209 "Correction:" + msg["body"], 

210 thread=thread, 

211 mentions=mentions, 

212 link_previews=link_previews, 

213 ) 

214 elif not msg["body"].strip() and recipient.RETRACTION: 

215 await session.on_retract(recipient, legacy_id, thread=thread) 

216 new_legacy_msg_id = None 

217 elif recipient.CORRECTION: 

218 new_legacy_msg_id = await session.on_correct( 

219 recipient, 

220 msg["body"], 

221 legacy_id, 

222 thread=thread, 

223 mentions=mentions, 

224 link_previews=link_previews, 

225 ) 

226 else: 

227 session.send_gateway_message( 

228 "Last message correction is not supported by this legacy service. " 

229 "Slidge will send your correction as new message." 

230 ) 

231 if recipient.RETRACTION and legacy_id is not None: 

232 if legacy_id is not None: 

233 session.send_gateway_message( 

234 "Slidge will attempt to retract the original message you wanted" 

235 " to edit." 

236 ) 

237 await session.on_retract(recipient, legacy_id, thread=thread) 

238 

239 new_legacy_msg_id = await session.on_text( 

240 recipient, 

241 "Correction: " + msg["body"], 

242 thread=thread, 

243 mentions=mentions, 

244 link_previews=link_previews, 

245 ) 

246 

247 if isinstance(recipient, LegacyMUC): 

248 await recipient.echo(msg, new_legacy_msg_id) 

249 else: 

250 self.__ack(msg) 

251 if new_legacy_msg_id is None: 

252 return 

253 with self.xmpp.store.session() as orm: 

254 self.xmpp.store.id_map.set_msg( 

255 orm, 

256 recipient.stored.id, 

257 new_legacy_msg_id, 

258 [msg.get_id()], 

259 recipient.is_group, 

260 ) 

261 orm.commit() 

262 

263 @exceptions_to_xmpp_errors 

264 async def on_message_retract(self, msg: Message): 

265 session, recipient, thread = await self._get_session_recipient_thread(msg) 

266 if not recipient.RETRACTION: 

267 raise XMPPError( 

268 "bad-request", 

269 "This legacy service does not support message retraction.", 

270 ) 

271 xmpp_id: str = msg["retract"]["id"] 

272 legacy_id = self._xmpp_msg_id_to_legacy( 

273 session, xmpp_id, recipient, origin=True 

274 ) 

275 await session.on_retract(recipient, legacy_id, thread=thread) 

276 if isinstance(recipient, LegacyMUC): 

277 await recipient.echo(msg, None) 

278 self.__ack(msg) 

279 

280 @exceptions_to_xmpp_errors 

281 async def on_reactions(self, msg: Message): 

282 session, recipient, thread = await self._get_session_recipient_thread(msg) 

283 react_to: str = msg["reactions"]["id"] 

284 

285 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

286 session.SPECIAL_MSG_ID_PREFIX 

287 ) 

288 

289 if special_msg: 

290 legacy_id = react_to 

291 else: 

292 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient) 

293 

294 if not legacy_id: 

295 log.debug("Ignored reaction from user") 

296 raise XMPPError( 

297 "internal-server-error", 

298 "Could not convert the XMPP msg ID to a legacy ID", 

299 ) 

300 

301 emojis = [ 

302 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

303 ] 

304 error_msg = None 

305 recipient = recipient 

306 

307 if not special_msg: 

308 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

309 error_msg = "Maximum 1 emoji/message" 

310 

311 if not error_msg and ( 

312 subset := await recipient.available_emojis(legacy_id) 

313 ): 

314 if not set(emojis).issubset(subset): 

315 error_msg = f"You can only react with the following emojis: {''.join(subset)}" 

316 

317 if error_msg: 

318 session.send_gateway_message(error_msg) 

319 if not isinstance(recipient, LegacyMUC): 

320 # no need to carbon for groups, we just don't echo the stanza 

321 recipient.react(legacy_id, carbon=True) 

322 await session.on_react(recipient, legacy_id, [], thread=thread) 

323 raise XMPPError( 

324 "policy-violation", 

325 text=error_msg, 

326 clear=False, 

327 ) 

328 

329 await session.on_react(recipient, legacy_id, emojis, thread=thread) 

330 if isinstance(recipient, LegacyMUC): 

331 await recipient.echo(msg, None) 

332 else: 

333 self.__ack(msg) 

334 

335 with self.xmpp.store.session() as orm: 

336 multi = self.xmpp.store.id_map.get_xmpp( 

337 orm, recipient.stored.id, legacy_id, recipient.is_group 

338 ) 

339 if not multi: 

340 return 

341 multi = [m for m in multi if react_to != m] 

342 

343 if isinstance(recipient, LegacyMUC): 

344 for xmpp_id in multi: 

345 mc = copy(msg) 

346 mc["reactions"]["id"] = xmpp_id 

347 await recipient.echo(mc) 

348 elif isinstance(recipient, LegacyContact): 

349 for xmpp_id in multi: 

350 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

351 

352 def __ack(self, msg: Message) -> None: 

353 if not self.xmpp.PROPER_RECEIPTS: 

354 self.xmpp.delivery_receipt.ack(msg) 

355 

356 async def __get_reply( 

357 self, msg: Message, session: BaseSession, recipient: Recipient 

358 ) -> tuple[ 

359 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None 

360 ]: 

361 try: 

362 reply_to_msg_id = self._xmpp_msg_id_to_legacy( 

363 session, msg["reply"]["id"], recipient 

364 ) 

365 except XMPPError: 

366 session.log.debug( 

367 "Could not determine reply-to legacy msg ID, sending quote instead." 

368 ) 

369 return redact_url(msg["body"]), None, None, None 

370 

371 reply_to_jid = JID(msg["reply"]["to"]) 

372 reply_to = None 

373 if msg["type"] == "chat": 

374 if reply_to_jid.bare != session.user_jid.bare: 

375 try: 

376 reply_to = await session.contacts.by_jid(reply_to_jid) 

377 except XMPPError: 

378 pass 

379 elif msg["type"] == "groupchat": 

380 nick = reply_to_jid.resource 

381 try: 

382 muc = await session.bookmarks.by_jid(reply_to_jid) 

383 except XMPPError: 

384 pass 

385 else: 

386 if nick == muc.user_nick: 

387 reply_to = await muc.get_user_participant() 

388 elif not nick: 

389 reply_to = muc.get_system_participant() 

390 else: 

391 reply_to = await muc.get_participant(nick, store=False) 

392 

393 if msg.get_plugin("fallback", check=True) and ( 

394 isinstance(recipient, LegacyMUC) or recipient.REPLIES 

395 ): 

396 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

397 try: 

398 reply_fallback = redact_url(msg["reply"].get_fallback_body()) 

399 except AttributeError: 

400 reply_fallback = None 

401 else: 

402 text = msg["body"] 

403 reply_fallback = None 

404 

405 return text, reply_to_msg_id, reply_to, reply_fallback 

406 

407 async def __send_url( 

408 self, url: str, session: BaseSession, recipient: Recipient, **kwargs 

409 ) -> int | str | None: 

410 async with self.xmpp.http.get(url) as response: 

411 if response.status >= 400: 

412 session.log.warning( 

413 "OOB url cannot be downloaded: %s, sending the URL as text" 

414 " instead.", 

415 response, 

416 ) 

417 return await session.on_text(recipient, url, **kwargs) 

418 

419 return await session.on_file( 

420 recipient, url, http_response=response, **kwargs 

421 ) 

422 

423 async def __send_bob( 

424 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs 

425 ) -> int | str | None: 

426 with self.xmpp.store.session() as orm: 

427 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

428 if sticker is None: 

429 await self.xmpp.plugin["xep_0231"].get_bob( 

430 from_, cid, ifrom=self.xmpp.boundjid 

431 ) 

432 with self.xmpp.store.session() as orm: 

433 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

434 assert sticker is not None 

435 return await session.on_sticker(recipient, sticker, **kwargs) 

436 

437 

438def redact_url(text: str) -> str: 

439 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX 

440 if not needle: 

441 return text 

442 return text.replace(needle, "") 

443 

444 

445log = logging.getLogger(__name__)