Coverage for slidge/core/dispatcher/message/message.py: 73%

232 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import logging 

2from copy import copy 

3from xml.etree import ElementTree 

4 

5from slixmpp import JID, Message 

6from slixmpp.exceptions import XMPPError 

7 

8from ....contact.contact import LegacyContact 

9from ....group.participant import LegacyParticipant 

10from ....group.room import LegacyMUC 

11from ....util.types import LinkPreview, Recipient 

12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

13from ... import config 

14from ...session import BaseSession 

15from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

16 

17 

18class MessageContentMixin(DispatcherMixin): 

19 __slots__: list[str] = [] 

20 

21 def __init__(self, xmpp) -> None: 

22 super().__init__(xmpp) 

23 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

24 xmpp.add_event_handler("message_correction", self.on_message_correction) 

25 xmpp.add_event_handler("message_retract", self.on_message_retract) 

26 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

27 xmpp.add_event_handler("reactions", self.on_reactions) 

28 

29 async def on_groupchat_message(self, msg: Message) -> None: 

30 await self.on_legacy_message(msg) 

31 

32 @exceptions_to_xmpp_errors 

33 async def on_legacy_message(self, msg: Message) -> None: 

34 """ 

35 Meant to be called from :class:`BaseGateway` only. 

36 

37 :param msg: 

38 :return: 

39 """ 

40 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

41 # present. this is a problem for MUC echoed messages 

42 if msg.get_plugin("replace", check=True) is not None: 

43 # ignore last message correction (handled by a specific method) 

44 return 

45 if msg.get_plugin("apply_to", check=True) is not None: 

46 # ignore message retraction (handled by a specific method) 

47 return 

48 if msg.get_plugin("reactions", check=True) is not None: 

49 # ignore message reaction fallback. 

50 # the reaction itself is handled by self.react_from_msg(). 

51 return 

52 if msg.get_plugin("retract", check=True) is not None: 

53 # ignore message retraction fallback. 

54 # the retraction itself is handled by self.on_retract 

55 return 

56 cid = None 

57 if msg.get_plugin("html", check=True) is not None: 

58 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

59 p = body.findall("p") 

60 if p is not None and len(p) == 1: 

61 if p[0].text is None or not p[0].text.strip(): 

62 images = p[0].findall("img") 

63 if len(images) == 1: 

64 # no text, single img ⇒ this is a sticker 

65 # other cases should be interpreted as "custom emojis" in text 

66 src = images[0].get("src") 

67 if src is not None and src.startswith("cid:"): 

68 cid = src.removeprefix("cid:") 

69 

70 session, recipient, thread = await self._get_session_recipient_thread(msg) 

71 

72 if msg.get_plugin("oob", check=True) is not None: 

73 url = msg["oob"]["url"] 

74 elif ( 

75 "reference" in msg 

76 and "sims" in msg["reference"] 

77 and "sources" in msg["reference"]["sims"] 

78 ): 

79 for source in msg["reference"]["sims"]["sources"]["substanzas"]: 

80 if source["uri"].startswith("http"): 

81 url = source["uri"] 

82 break 

83 else: 

84 url = None 

85 else: 

86 url = None 

87 

88 if msg.get_plugin("reply", check=True): 

89 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply( 

90 msg, session, recipient 

91 ) 

92 else: 

93 text = msg["body"] 

94 reply_to_msg_id = None 

95 reply_to = None 

96 reply_fallback = None 

97 

98 if msg.get_plugin("link_previews", check=True): 

99 link_previews = [ 

100 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"] 

101 ] 

102 else: 

103 link_previews = [] 

104 

105 if url: 

106 legacy_msg_id = await self.__send_url( 

107 url, 

108 session, 

109 recipient, 

110 reply_to_msg_id=reply_to_msg_id, 

111 reply_to_fallback_text=reply_fallback, 

112 reply_to=reply_to, 

113 thread=thread, 

114 ) 

115 elif cid: 

116 legacy_msg_id = await self.__send_bob( 

117 msg.get_from(), 

118 cid, 

119 session, 

120 recipient, 

121 reply_to_msg_id=reply_to_msg_id, 

122 reply_to_fallback_text=reply_fallback, 

123 reply_to=reply_to, 

124 thread=thread, 

125 ) 

126 elif text: 

127 if isinstance(recipient, LegacyMUC): 

128 mentions = {"mentions": await recipient.parse_mentions(text)} 

129 else: 

130 mentions = {} 

131 legacy_msg_id = await session.on_text( 

132 recipient, 

133 text, 

134 reply_to_msg_id=reply_to_msg_id, 

135 reply_to_fallback_text=reply_fallback, 

136 reply_to=reply_to, 

137 thread=thread, 

138 link_previews=link_previews, 

139 **mentions, 

140 ) 

141 else: 

142 log.debug("Ignoring %s", msg.get_id()) 

143 return 

144 

145 if isinstance(recipient, LegacyMUC): 

146 await recipient.echo(msg, legacy_msg_id) 

147 else: 

148 self.__ack(msg) 

149 

150 if legacy_msg_id is None: 

151 return 

152 

153 with self.xmpp.store.session() as orm: 

154 self.xmpp.store.id_map.set_msg( 

155 orm, 

156 recipient.stored.id, 

157 str(legacy_msg_id), 

158 [msg.get_id()], 

159 recipient.is_group, 

160 ) 

161 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]): 

162 self.xmpp.store.id_map.set_thread( 

163 orm, recipient.stored.id, t, str(legacy_msg_id), recipient.is_group 

164 ) 

165 orm.commit() 

166 

167 @exceptions_to_xmpp_errors 

168 async def on_message_correction(self, msg: Message) -> None: 

169 if msg.get_plugin("retract", check=True) is not None: 

170 # ignore message retraction fallback (fallback=last msg correction) 

171 return 

172 session, recipient, thread = await self._get_session_recipient_thread(msg) 

173 xmpp_id = msg["replace"]["id"] 

174 if isinstance(recipient, LegacyMUC): 

175 with self.xmpp.store.session() as orm: 

176 legacy_id_str = self.xmpp.store.id_map.get_legacy( 

177 orm, recipient.stored.id, xmpp_id, True 

178 ) 

179 if legacy_id_str is None: 

180 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id, recipient) 

181 else: 

182 legacy_id = self.xmpp.LEGACY_MSG_ID_TYPE(legacy_id_str) 

183 else: 

184 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id, recipient) 

185 

186 if isinstance(recipient, LegacyMUC): 

187 mentions = await recipient.parse_mentions(msg["body"]) 

188 else: 

189 mentions = None 

190 

191 if previews := msg["link_previews"]: 

192 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews] 

193 else: 

194 link_previews = [] 

195 

196 if legacy_id is None: 

197 log.debug("Did not find legacy ID to correct") 

198 new_legacy_msg_id = await session.on_text( 

199 recipient, 

200 "Correction:" + msg["body"], 

201 thread=thread, 

202 mentions=mentions, 

203 link_previews=link_previews, 

204 ) 

205 elif not msg["body"].strip() and recipient.RETRACTION: 

206 await session.on_retract(recipient, legacy_id, thread=thread) 

207 new_legacy_msg_id = None 

208 elif recipient.CORRECTION: 

209 new_legacy_msg_id = await session.on_correct( 

210 recipient, 

211 msg["body"], 

212 legacy_id, 

213 thread=thread, 

214 mentions=mentions, 

215 link_previews=link_previews, 

216 ) 

217 else: 

218 session.send_gateway_message( 

219 "Last message correction is not supported by this legacy service. " 

220 "Slidge will send your correction as new message." 

221 ) 

222 if recipient.RETRACTION and legacy_id is not None: 

223 if legacy_id is not None: 

224 session.send_gateway_message( 

225 "Slidge will attempt to retract the original message you wanted" 

226 " to edit." 

227 ) 

228 await session.on_retract(recipient, legacy_id, thread=thread) 

229 

230 new_legacy_msg_id = await session.on_text( 

231 recipient, 

232 "Correction: " + msg["body"], 

233 thread=thread, 

234 mentions=mentions, 

235 link_previews=link_previews, 

236 ) 

237 

238 if isinstance(recipient, LegacyMUC): 

239 await recipient.echo(msg, new_legacy_msg_id) 

240 else: 

241 self.__ack(msg) 

242 if new_legacy_msg_id is None: 

243 return 

244 with self.xmpp.store.session() as orm: 

245 self.xmpp.store.id_map.set_msg( 

246 orm, 

247 recipient.stored.id, 

248 new_legacy_msg_id, 

249 msg.get_id(), 

250 recipient.is_group, 

251 ) 

252 orm.commit() 

253 

254 @exceptions_to_xmpp_errors 

255 async def on_message_retract(self, msg: Message): 

256 session, recipient, thread = await self._get_session_recipient_thread(msg) 

257 if not recipient.RETRACTION: 

258 raise XMPPError( 

259 "bad-request", 

260 "This legacy service does not support message retraction.", 

261 ) 

262 xmpp_id: str = msg["retract"]["id"] 

263 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id, recipient) 

264 if legacy_id: 

265 await session.on_retract(recipient, legacy_id, thread=thread) 

266 if isinstance(recipient, LegacyMUC): 

267 await recipient.echo(msg, None) 

268 else: 

269 log.debug("Ignored retraction from user") 

270 self.__ack(msg) 

271 

272 @exceptions_to_xmpp_errors 

273 async def on_reactions(self, msg: Message): 

274 session, recipient, thread = await self._get_session_recipient_thread(msg) 

275 react_to: str = msg["reactions"]["id"] 

276 

277 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

278 session.SPECIAL_MSG_ID_PREFIX 

279 ) 

280 

281 if special_msg: 

282 legacy_id = react_to 

283 else: 

284 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to, recipient) 

285 

286 if not legacy_id: 

287 log.debug("Ignored reaction from user") 

288 raise XMPPError( 

289 "internal-server-error", 

290 "Could not convert the XMPP msg ID to a legacy ID", 

291 ) 

292 

293 emojis = [ 

294 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

295 ] 

296 error_msg = None 

297 recipient = recipient 

298 

299 if not special_msg: 

300 if recipient.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

301 error_msg = "Maximum 1 emoji/message" 

302 

303 if not error_msg and ( 

304 subset := await recipient.available_emojis(legacy_id) 

305 ): 

306 if not set(emojis).issubset(subset): 

307 error_msg = f"You can only react with the following emojis: {''.join(subset)}" 

308 

309 if error_msg: 

310 session.send_gateway_message(error_msg) 

311 if not isinstance(recipient, LegacyMUC): 

312 # no need to carbon for groups, we just don't echo the stanza 

313 recipient.react(legacy_id, carbon=True) 

314 await session.on_react(recipient, legacy_id, [], thread=thread) 

315 raise XMPPError( 

316 "policy-violation", 

317 text=error_msg, 

318 ) 

319 

320 await session.on_react(recipient, legacy_id, emojis, thread=thread) 

321 if isinstance(recipient, LegacyMUC): 

322 await recipient.echo(msg, None) 

323 else: 

324 self.__ack(msg) 

325 

326 with self.xmpp.store.session() as orm: 

327 multi = self.xmpp.store.id_map.get_xmpp( 

328 orm, recipient.stored.id, legacy_id, recipient.is_group 

329 ) 

330 if not multi: 

331 return 

332 multi = [m for m in multi if react_to != m] 

333 

334 if isinstance(recipient, LegacyMUC): 

335 for xmpp_id in multi: 

336 mc = copy(msg) 

337 mc["reactions"]["id"] = xmpp_id 

338 await recipient.echo(mc) 

339 elif isinstance(recipient, LegacyContact): 

340 for xmpp_id in multi: 

341 recipient.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

342 

343 def __ack(self, msg: Message) -> None: 

344 if not self.xmpp.PROPER_RECEIPTS: 

345 self.xmpp.delivery_receipt.ack(msg) 

346 

347 async def __get_reply( 

348 self, msg: Message, session: BaseSession, recipient: Recipient 

349 ) -> tuple[ 

350 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None 

351 ]: 

352 try: 

353 reply_to_msg_id = self._xmpp_msg_id_to_legacy( 

354 session, msg["reply"]["id"], recipient 

355 ) 

356 except XMPPError: 

357 session.log.debug( 

358 "Could not determine reply-to legacy msg ID, sending quote instead." 

359 ) 

360 return msg["body"], None, None, None 

361 

362 reply_to_jid = JID(msg["reply"]["to"]) 

363 reply_to = None 

364 if msg["type"] == "chat": 

365 if reply_to_jid.bare != session.user_jid.bare: 

366 try: 

367 reply_to = await session.contacts.by_jid(reply_to_jid) 

368 except XMPPError: 

369 pass 

370 elif msg["type"] == "groupchat": 

371 nick = reply_to_jid.resource 

372 try: 

373 muc = await session.bookmarks.by_jid(reply_to_jid) 

374 except XMPPError: 

375 pass 

376 else: 

377 if nick != muc.user_nick: 

378 reply_to = await muc.get_participant( 

379 reply_to_jid.resource, store=False 

380 ) 

381 

382 if msg.get_plugin("fallback", check=True) and ( 

383 isinstance(recipient, LegacyMUC) or recipient.REPLIES 

384 ): 

385 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

386 try: 

387 reply_fallback = msg["reply"].get_fallback_body() 

388 except AttributeError: 

389 reply_fallback = None 

390 else: 

391 text = msg["body"] 

392 reply_fallback = None 

393 

394 return text, reply_to_msg_id, reply_to, reply_fallback 

395 

396 async def __send_url( 

397 self, url: str, session: BaseSession, recipient: Recipient, **kwargs 

398 ) -> int | str | None: 

399 async with self.xmpp.http.get(url) as response: 

400 if response.status >= 400: 

401 session.log.warning( 

402 "OOB url cannot be downloaded: %s, sending the URL as text" 

403 " instead.", 

404 response, 

405 ) 

406 return await session.on_text(recipient, url, **kwargs) 

407 

408 return await session.on_file( 

409 recipient, url, http_response=response, **kwargs 

410 ) 

411 

412 async def __send_bob( 

413 self, from_: JID, cid: str, session: BaseSession, recipient: Recipient, **kwargs 

414 ) -> int | str | None: 

415 with self.xmpp.store.session() as orm: 

416 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

417 if sticker is None: 

418 await self.xmpp.plugin["xep_0231"].get_bob(from_, cid) 

419 with self.xmpp.store.session() as orm: 

420 sticker = self.xmpp.store.bob.get_sticker(orm, cid) 

421 assert sticker is not None 

422 return await session.on_sticker(recipient, sticker, **kwargs) 

423 

424 

425log = logging.getLogger(__name__)