Coverage for slidge/core/dispatcher/message/message.py: 81%

233 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import logging 

2from copy import copy 

3from xml.etree import ElementTree 

4 

5from slixmpp import JID, Message 

6from slixmpp.exceptions import XMPPError 

7 

8from ....contact.contact import LegacyContact 

9from ....group.participant import LegacyParticipant 

10from ....group.room import LegacyMUC 

11from ....util.types import LinkPreview, Recipient 

12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

13from ... import config 

14from ...session import BaseSession 

15from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

16 

17 

18class MessageContentMixin(DispatcherMixin): 

19 __slots__: list[str] = [] 

20 

21 def __init__(self, xmpp) -> None: 

22 super().__init__(xmpp) 

23 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

24 xmpp.add_event_handler("message_correction", self.on_message_correction) 

25 xmpp.add_event_handler("message_retract", self.on_message_retract) 

26 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

27 xmpp.add_event_handler("reactions", self.on_reactions) 

28 

29 async def on_groupchat_message(self, msg: Message) -> None: 

30 await self.on_legacy_message(msg) 

31 

32 @exceptions_to_xmpp_errors 

33 async def on_legacy_message(self, msg: Message) -> None: 

34 """ 

35 Meant to be called from :class:`BaseGateway` only. 

36 

37 :param msg: 

38 :return: 

39 """ 

40 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

41 # present. this is a problem for MUC echoed messages 

42 if msg.get_plugin("replace", check=True) is not None: 

43 # ignore last message correction (handled by a specific method) 

44 return 

45 if msg.get_plugin("apply_to", check=True) is not None: 

46 # ignore message retraction (handled by a specific method) 

47 return 

48 if msg.get_plugin("reactions", check=True) is not None: 

49 # ignore message reaction fallback. 

50 # the reaction itself is handled by self.react_from_msg(). 

51 return 

52 if msg.get_plugin("retract", check=True) is not None: 

53 # ignore message retraction fallback. 

54 # the retraction itself is handled by self.on_retract 

55 return 

56 cid = None 

57 if msg.get_plugin("html", check=True) is not None: 

58 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

59 p = body.findall("p") 

60 if p is not None and len(p) == 1: 

61 if p[0].text is None or not p[0].text.strip(): 

62 images = p[0].findall("img") 

63 if len(images) == 1: 

64 # no text, single img ⇒ this is a sticker 

65 # other cases should be interpreted as "custom emojis" in text 

66 src = images[0].get("src") 

67 if src is not None and src.startswith("cid:"): 

68 cid = src.removeprefix("cid:") 

69 

70 session, recipient, thread = await self._get_session_recipient_thread(msg) 

71 

72 if msg.get_plugin("oob", check=True) is not None: 

73 url = msg["oob"]["url"] 

74 elif ( 

75 "reference" in msg 

76 and "sims" in msg["reference"] 

77 and "sources" in msg["reference"]["sims"] 

78 ): 

79 for source in msg["reference"]["sims"]["sources"]["substanzas"]: 

80 if source["uri"].startswith("http"): 

81 url = source["uri"] 

82 break 

83 else: 

84 url = None 

85 else: 

86 url = None 

87 

88 if msg.get_plugin("reply", check=True): 

89 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply( 

90 msg, session, recipient 

91 ) 

92 else: 

93 text = msg["body"] 

94 reply_to_msg_id = None 

95 reply_to = None 

96 reply_fallback = None 

97 

98 if msg.get_plugin("link_previews", check=True): 

99 link_previews = [ 

100 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"] 

101 ] 

102 else: 

103 link_previews = [] 

104 

105 if url: 

106 legacy_msg_id = await self.__send_url( 

107 url, 

108 session, 

109 recipient, 

110 reply_to_msg_id=reply_to_msg_id, 

111 reply_to_fallback_text=reply_fallback, 

112 reply_to=reply_to, 

113 thread=thread, 

114 ) 

115 elif cid: 

116 legacy_msg_id = await self.__send_bob( 

117 msg.get_from(), 

118 cid, 

119 session, 

120 recipient, 

121 reply_to_msg_id=reply_to_msg_id, 

122 reply_to_fallback_text=reply_fallback, 

123 reply_to=reply_to, 

124 thread=thread, 

125 ) 

126 elif text: 

127 if isinstance(recipient, LegacyMUC): 

128 mentions = {"mentions": await recipient.parse_mentions(text)} 

129 else: 

130 mentions = {} 

131 legacy_msg_id = await session.on_text( 

132 recipient, 

133 text, 

134 reply_to_msg_id=reply_to_msg_id, 

135 reply_to_fallback_text=reply_fallback, 

136 reply_to=reply_to, 

137 thread=thread, 

138 link_previews=link_previews, 

139 **mentions, 

140 ) 

141 else: 

142 log.debug("Ignoring %s", msg.get_id()) 

143 return 

144 

145 if isinstance(recipient, LegacyMUC): 

146 stanza_id = await recipient.echo(msg, legacy_msg_id) 

147 else: 

148 stanza_id = None 

149 self.__ack(msg) 

150 

151 if legacy_msg_id is None: 

152 return 

153 

154 with self.xmpp.store.session() as orm: 

155 if recipient.is_group: 

156 self.xmpp.store.id_map.set_origin( 

157 orm, recipient.stored.id, str(legacy_msg_id), msg.get_id() 

158 ) 

159 assert stanza_id is not None 

160 self.xmpp.store.id_map.set_msg( 

161 orm, 

162 recipient.stored.id, 

163 str(legacy_msg_id), 

164 [stanza_id], 

165 True, 

166 ) 

167 else: 

168 self.xmpp.store.id_map.set_msg( 

169 orm, 

170 recipient.stored.id, 

171 str(legacy_msg_id), 

172 [msg.get_id()], 

173 False, 

174 ) 

175 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]): 

176 self.xmpp.store.id_map.set_thread( 

177 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group 

178 ) 

179 orm.commit() 

180 

181 @exceptions_to_xmpp_errors 

182 async def on_message_correction(self, msg: Message) -> None: 

183 if msg.get_plugin("retract", check=True) is not None: 

184 # ignore message retraction fallback (fallback=last msg correction) 

185 return 

186 session, recipient, thread = await self._get_session_recipient_thread(msg) 

187 legacy_id = self._xmpp_msg_id_to_legacy( 

188 session, msg["replace"]["id"], recipient, True 

189 ) 

190 

191 if isinstance(recipient, LegacyMUC): 

192 mentions = await recipient.parse_mentions(msg["body"]) 

193 else: 

194 mentions = None 

195 

196 if previews := msg["link_previews"]: 

197 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews] 

198 else: 

199 link_previews = [] 

200 

201 if legacy_id is None: 

202 log.debug("Did not find legacy ID to correct") 

203 new_legacy_msg_id = await session.on_text( 

204 recipient, 

205 "Correction:" + msg["body"], 

206 thread=thread, 

207 mentions=mentions, 

208 link_previews=link_previews, 

209 ) 

210 elif not msg["body"].strip() and recipient.RETRACTION: 

211 await session.on_retract(recipient, legacy_id, thread=thread) 

212 new_legacy_msg_id = None 

213 elif recipient.CORRECTION: 

214 new_legacy_msg_id = await session.on_correct( 

215 recipient, 

216 msg["body"], 

217 legacy_id, 

218 thread=thread, 

219 mentions=mentions, 

220 link_previews=link_previews, 

221 ) 

222 else: 

223 session.send_gateway_message( 

224 "Last message correction is not supported by this legacy service. " 

225 "Slidge will send your correction as new message." 

226 ) 

227 if recipient.RETRACTION and legacy_id is not None: 

228 if legacy_id is not None: 

229 session.send_gateway_message( 

230 "Slidge will attempt to retract the original message you wanted" 

231 " to edit." 

232 ) 

233 await session.on_retract(recipient, legacy_id, thread=thread) 

234 

235 new_legacy_msg_id = await session.on_text( 

236 recipient, 

237 "Correction: " + msg["body"], 

238 thread=thread, 

239 mentions=mentions, 

240 link_previews=link_previews, 

241 ) 

242 

243 if isinstance(recipient, LegacyMUC): 

244 await recipient.echo(msg, new_legacy_msg_id) 

245 else: 

246 self.__ack(msg) 

247 if new_legacy_msg_id is None: 

248 return 

249 with self.xmpp.store.session() as orm: 

250 self.xmpp.store.id_map.set_msg( 

251 orm, 

252 recipient.stored.id, 

253 new_legacy_msg_id, 

254 msg.get_id(), 

255 recipient.is_group, 

256 ) 

257 orm.commit() 

258 

259 @exceptions_to_xmpp_errors 

260 async def on_message_retract(self, msg: Message): 

261 session, recipient, thread = await self._get_session_recipient_thread(msg) 

262 if not recipient.RETRACTION: 

263 raise XMPPError( 

264 "bad-request", 

265 "This legacy service does not support message retraction.", 

266 ) 

267 xmpp_id: str = msg["retract"]["id"] 

268 legacy_id = self._xmpp_msg_id_to_legacy( 

269 session, xmpp_id, recipient, origin=True 

270 ) 

271 await session.on_retract(recipient, legacy_id, thread=thread) 

272 if isinstance(recipient, LegacyMUC): 

273 await recipient.echo(msg, None) 

274 self.__ack(msg) 

275 

276 @exceptions_to_xmpp_errors 

277 async def on_reactions(self, msg: Message): 

278 session, recipient, thread = await self._get_session_recipient_thread(msg) 

279 react_to: str = msg["reactions"]["id"] 

280 

281 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

282 session.SPECIAL_MSG_ID_PREFIX 

283 ) 

284 

285 if special_msg: 

286 legacy_id = react_to 

287 else: 

288 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient) 

289 

290 if not legacy_id: 

291 log.debug("Ignored reaction from user") 

292 raise XMPPError( 

293 "internal-server-error", 

294 "Could not convert the XMPP msg ID to a legacy ID", 

295 ) 

296 

297 emojis = [ 

298 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

299 ] 

300 error_msg = None 

301 recipient = recipient 

302 

303 if not special_msg: 

304 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

305 error_msg = "Maximum 1 emoji/message" 

306 

307 if not error_msg and ( 

308 subset := await recipient.available_emojis(legacy_id) 

309 ): 

310 if not set(emojis).issubset(subset): 

311 error_msg = f"You can only react with the following emojis: {''.join(subset)}" 

312 

313 if error_msg: 

314 session.send_gateway_message(error_msg) 

315 if not isinstance(recipient, LegacyMUC): 

316 # no need to carbon for groups, we just don't echo the stanza 

317 recipient.react(legacy_id, carbon=True) 

318 await session.on_react(recipient, legacy_id, [], thread=thread) 

319 raise XMPPError( 

320 "policy-violation", 

321 text=error_msg, 

322 clear=False, 

323 ) 

324 

325 await session.on_react(recipient, legacy_id, emojis, thread=thread) 

326 if isinstance(recipient, LegacyMUC): 

327 await recipient.echo(msg, None) 

328 else: 

329 self.__ack(msg) 

330 

331 with self.xmpp.store.session() as orm: 

332 multi = self.xmpp.store.id_map.get_xmpp( 

333 orm, recipient.stored.id, legacy_id, recipient.is_group 

334 ) 

335 if not multi: 

336 return 

337 multi = [m for m in multi if react_to != m] 

338 

339 if isinstance(recipient, LegacyMUC): 

340 for xmpp_id in multi: 

341 mc = copy(msg) 

342 mc["reactions"]["id"] = xmpp_id 

343 await recipient.echo(mc) 

344 elif isinstance(recipient, LegacyContact): 

345 for xmpp_id in multi: 

346 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

347 

348 def __ack(self, msg: Message) -> None: 

349 if not self.xmpp.PROPER_RECEIPTS: 

350 self.xmpp.delivery_receipt.ack(msg) 

351 

352 async def __get_reply( 

353 self, msg: Message, session: BaseSession, recipient: Recipient 

354 ) -> tuple[ 

355 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None 

356 ]: 

357 try: 

358 reply_to_msg_id = self._xmpp_msg_id_to_legacy( 

359 session, msg["reply"]["id"], recipient 

360 ) 

361 except XMPPError: 

362 session.log.debug( 

363 "Could not determine reply-to legacy msg ID, sending quote instead." 

364 ) 

365 return redact_url(msg["body"]), None, None, None 

366 

367 reply_to_jid = JID(msg["reply"]["to"]) 

368 reply_to = None 

369 if msg["type"] == "chat": 

370 if reply_to_jid.bare != session.user_jid.bare: 

371 try: 

372 reply_to = await session.contacts.by_jid(reply_to_jid) 

373 except XMPPError: 

374 pass 

375 elif msg["type"] == "groupchat": 

376 nick = reply_to_jid.resource 

377 try: 

378 muc = await session.bookmarks.by_jid(reply_to_jid) 

379 except XMPPError: 

380 pass 

381 else: 

382 if nick != muc.user_nick: 

383 reply_to = await muc.get_participant( 

384 reply_to_jid.resource, store=False 

385 ) 

386 

387 if msg.get_plugin("fallback", check=True) and ( 

388 isinstance(recipient, LegacyMUC) or recipient.REPLIES 

389 ): 

390 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

391 try: 

392 reply_fallback = redact_url(msg["reply"].get_fallback_body()) 

393 except AttributeError: 

394 reply_fallback = None 

395 else: 

396 text = msg["body"] 

397 reply_fallback = None 

398 

399 return text, reply_to_msg_id, reply_to, reply_fallback 

400 

401 async def __send_url( 

402 self, url: str, session: BaseSession, recipient: Recipient, **kwargs 

403 ) -> int | str | None: 

404 async with self.xmpp.http.get(url) as response: 

405 if response.status >= 400: 

406 session.log.warning( 

407 "OOB url cannot be downloaded: %s, sending the URL as text" 

408 " instead.", 

409 response, 

410 ) 

411 return await session.on_text(recipient, url, **kwargs) 

412 

413 return await session.on_file( 

414 recipient, url, http_response=response, **kwargs 

415 ) 

416 

417 async def __send_bob( 

418 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs 

419 ) -> int | str | None: 

420 with self.xmpp.store.session() as orm: 

421 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

422 if sticker is None: 

423 await self.xmpp.plugin["xep_0231"].get_bob( 

424 from_, cid, ifrom=self.xmpp.boundjid 

425 ) 

426 with self.xmpp.store.session() as orm: 

427 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

428 assert sticker is not None 

429 return await session.on_sticker(recipient, sticker, **kwargs) 

430 

431 

432def redact_url(text: str) -> str: 

433 needle = config.NO_UPLOAD_URL_PREFIX or config.UPLOAD_URL_PREFIX 

434 if not needle: 

435 return text 

436 return text.replace(needle, "") 

437 

438 

439log = logging.getLogger(__name__)