Coverage for slidge/core/dispatcher/message/message.py: 60%

219 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import logging 

2from copy import copy 

3from xml.etree import ElementTree 

4 

5from slixmpp import JID, Message 

6from slixmpp.exceptions import XMPPError 

7 

8from ....contact.contact import LegacyContact 

9from ....group.participant import LegacyParticipant 

10from ....group.room import LegacyMUC 

11from ....util.types import LinkPreview, Recipient 

12from ....util.util import dict_to_named_tuple, remove_emoji_variation_selector_16 

13from ... import config 

14from ...session import BaseSession 

15from ..util import DispatcherMixin, exceptions_to_xmpp_errors 

16 

17 

18class MessageContentMixin(DispatcherMixin): 

19 def __init__(self, xmpp): 

20 super().__init__(xmpp) 

21 xmpp.add_event_handler("legacy_message", self.on_legacy_message) 

22 xmpp.add_event_handler("message_correction", self.on_message_correction) 

23 xmpp.add_event_handler("message_retract", self.on_message_retract) 

24 xmpp.add_event_handler("groupchat_message", self.on_groupchat_message) 

25 xmpp.add_event_handler("reactions", self.on_reactions) 

26 

27 async def on_groupchat_message(self, msg: Message) -> None: 

28 await self.on_legacy_message(msg) 

29 

30 @exceptions_to_xmpp_errors 

31 async def on_legacy_message(self, msg: Message): 

32 """ 

33 Meant to be called from :class:`BaseGateway` only. 

34 

35 :param msg: 

36 :return: 

37 """ 

38 # we MUST not use `if m["replace"]["id"]` because it adds the tag if not 

39 # present. this is a problem for MUC echoed messages 

40 if msg.get_plugin("replace", check=True) is not None: 

41 # ignore last message correction (handled by a specific method) 

42 return 

43 if msg.get_plugin("apply_to", check=True) is not None: 

44 # ignore message retraction (handled by a specific method) 

45 return 

46 if msg.get_plugin("reactions", check=True) is not None: 

47 # ignore message reaction fallback. 

48 # the reaction itself is handled by self.react_from_msg(). 

49 return 

50 if msg.get_plugin("retract", check=True) is not None: 

51 # ignore message retraction fallback. 

52 # the retraction itself is handled by self.on_retract 

53 return 

54 cid = None 

55 if msg.get_plugin("html", check=True) is not None: 

56 body = ElementTree.fromstring("<body>" + msg["html"].get_body() + "</body>") 

57 p = body.findall("p") 

58 if p is not None and len(p) == 1: 

59 if p[0].text is None or not p[0].text.strip(): 

60 images = p[0].findall("img") 

61 if len(images) == 1: 

62 # no text, single img ⇒ this is a sticker 

63 # other cases should be interpreted as "custom emojis" in text 

64 src = images[0].get("src") 

65 if src is not None and src.startswith("cid:"): 

66 cid = src.removeprefix("cid:") 

67 

68 session, entity, thread = await self._get_session_entity_thread(msg) 

69 

70 if msg.get_plugin("oob", check=True) is not None: 

71 url = msg["oob"]["url"] 

72 else: 

73 url = None 

74 

75 if msg.get_plugin("reply", check=True): 

76 text, reply_to_msg_id, reply_to, reply_fallback = await self.__get_reply( 

77 msg, session, entity 

78 ) 

79 else: 

80 text = msg["body"] 

81 reply_to_msg_id = None 

82 reply_to = None 

83 reply_fallback = None 

84 

85 if msg.get_plugin("link_previews", check=True): 

86 link_previews = [ 

87 dict_to_named_tuple(p, LinkPreview) for p in msg["link_previews"] 

88 ] 

89 else: 

90 link_previews = [] 

91 

92 if url: 

93 legacy_msg_id = await self.__send_url( 

94 url, 

95 session, 

96 entity, 

97 reply_to_msg_id=reply_to_msg_id, 

98 reply_to_fallback_text=reply_fallback, 

99 reply_to=reply_to, 

100 thread=thread, 

101 ) 

102 elif cid: 

103 legacy_msg_id = await self.__send_bob( 

104 msg.get_from(), 

105 cid, 

106 session, 

107 entity, 

108 reply_to_msg_id=reply_to_msg_id, 

109 reply_to_fallback_text=reply_fallback, 

110 reply_to=reply_to, 

111 thread=thread, 

112 ) 

113 elif text: 

114 if isinstance(entity, LegacyMUC): 

115 mentions = {"mentions": await entity.parse_mentions(text)} 

116 else: 

117 mentions = {} 

118 legacy_msg_id = await session.on_text( 

119 entity, 

120 text, 

121 reply_to_msg_id=reply_to_msg_id, 

122 reply_to_fallback_text=reply_fallback, 

123 reply_to=reply_to, 

124 thread=thread, 

125 link_previews=link_previews, 

126 **mentions, 

127 ) 

128 else: 

129 log.debug("Ignoring %s", msg.get_id()) 

130 return 

131 

132 if isinstance(entity, LegacyMUC): 

133 await entity.echo(msg, legacy_msg_id) 

134 if legacy_msg_id is not None: 

135 self.xmpp.store.sent.set_group_message( 

136 session.user_pk, str(legacy_msg_id), msg.get_id() 

137 ) 

138 else: 

139 self.__ack(msg) 

140 if legacy_msg_id is not None: 

141 self.xmpp.store.sent.set_message( 

142 session.user_pk, str(legacy_msg_id), msg.get_id() 

143 ) 

144 if session.MESSAGE_IDS_ARE_THREAD_IDS and (t := msg["thread"]): 

145 self.xmpp.store.sent.set_thread( 

146 session.user_pk, t, str(legacy_msg_id) 

147 ) 

148 

149 @exceptions_to_xmpp_errors 

150 async def on_message_correction(self, msg: Message): 

151 if msg.get_plugin("retract", check=True) is not None: 

152 # ignore message retraction fallback (fallback=last msg correction) 

153 return 

154 session, entity, thread = await self._get_session_entity_thread(msg) 

155 xmpp_id = msg["replace"]["id"] 

156 if isinstance(entity, LegacyMUC): 

157 legacy_id_str = self.xmpp.store.sent.get_group_legacy_id( 

158 session.user_pk, xmpp_id 

159 ) 

160 if legacy_id_str is None: 

161 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id) 

162 else: 

163 legacy_id = self.xmpp.LEGACY_MSG_ID_TYPE(legacy_id_str) 

164 else: 

165 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id) 

166 

167 if isinstance(entity, LegacyMUC): 

168 mentions = await entity.parse_mentions(msg["body"]) 

169 else: 

170 mentions = None 

171 

172 if previews := msg["link_previews"]: 

173 link_previews = [dict_to_named_tuple(p, LinkPreview) for p in previews] 

174 else: 

175 link_previews = [] 

176 

177 if legacy_id is None: 

178 log.debug("Did not find legacy ID to correct") 

179 new_legacy_msg_id = await session.on_text( 

180 entity, 

181 "Correction:" + msg["body"], 

182 thread=thread, 

183 mentions=mentions, 

184 link_previews=link_previews, 

185 ) 

186 elif ( 

187 not msg["body"].strip() 

188 and config.CORRECTION_EMPTY_BODY_AS_RETRACTION 

189 and entity.RETRACTION 

190 ): 

191 await session.on_retract(entity, legacy_id, thread=thread) 

192 new_legacy_msg_id = None 

193 elif entity.CORRECTION: 

194 new_legacy_msg_id = await session.on_correct( 

195 entity, 

196 msg["body"], 

197 legacy_id, 

198 thread=thread, 

199 mentions=mentions, 

200 link_previews=link_previews, 

201 ) 

202 else: 

203 session.send_gateway_message( 

204 "Last message correction is not supported by this legacy service. " 

205 "Slidge will send your correction as new message." 

206 ) 

207 if ( 

208 config.LAST_MESSAGE_CORRECTION_RETRACTION_WORKAROUND 

209 and entity.RETRACTION 

210 and legacy_id is not None 

211 ): 

212 if legacy_id is not None: 

213 session.send_gateway_message( 

214 "Slidge will attempt to retract the original message you wanted" 

215 " to edit." 

216 ) 

217 await session.on_retract(entity, legacy_id, thread=thread) 

218 

219 new_legacy_msg_id = await session.on_text( 

220 entity, 

221 "Correction: " + msg["body"], 

222 thread=thread, 

223 mentions=mentions, 

224 link_previews=link_previews, 

225 ) 

226 

227 if isinstance(entity, LegacyMUC): 

228 if new_legacy_msg_id is not None: 

229 self.xmpp.store.sent.set_group_message( 

230 session.user_pk, new_legacy_msg_id, msg.get_id() 

231 ) 

232 await entity.echo(msg, new_legacy_msg_id) 

233 else: 

234 self.__ack(msg) 

235 if new_legacy_msg_id is not None: 

236 self.xmpp.store.sent.set_message( 

237 session.user_pk, new_legacy_msg_id, msg.get_id() 

238 ) 

239 

240 @exceptions_to_xmpp_errors 

241 async def on_message_retract(self, msg: Message): 

242 session, entity, thread = await self._get_session_entity_thread(msg) 

243 if not entity.RETRACTION: 

244 raise XMPPError( 

245 "bad-request", 

246 "This legacy service does not support message retraction.", 

247 ) 

248 xmpp_id: str = msg["retract"]["id"] 

249 legacy_id = self._xmpp_msg_id_to_legacy(session, xmpp_id) 

250 if legacy_id: 

251 await session.on_retract(entity, legacy_id, thread=thread) 

252 if isinstance(entity, LegacyMUC): 

253 await entity.echo(msg, None) 

254 else: 

255 log.debug("Ignored retraction from user") 

256 self.__ack(msg) 

257 

258 @exceptions_to_xmpp_errors 

259 async def on_reactions(self, msg: Message): 

260 session, entity, thread = await self._get_session_entity_thread(msg) 

261 react_to: str = msg["reactions"]["id"] 

262 

263 special_msg = session.SPECIAL_MSG_ID_PREFIX and react_to.startswith( 

264 session.SPECIAL_MSG_ID_PREFIX 

265 ) 

266 

267 if special_msg: 

268 legacy_id = react_to 

269 else: 

270 legacy_id = self._xmpp_msg_id_to_legacy(session, react_to) 

271 

272 if not legacy_id: 

273 log.debug("Ignored reaction from user") 

274 raise XMPPError( 

275 "internal-server-error", 

276 "Could not convert the XMPP msg ID to a legacy ID", 

277 ) 

278 

279 emojis = [ 

280 remove_emoji_variation_selector_16(r["value"]) for r in msg["reactions"] 

281 ] 

282 error_msg = None 

283 entity = entity 

284 

285 if not special_msg: 

286 if entity.REACTIONS_SINGLE_EMOJI and len(emojis) > 1: 

287 error_msg = "Maximum 1 emoji/message" 

288 

289 if not error_msg and (subset := await entity.available_emojis(legacy_id)): 

290 if not set(emojis).issubset(subset): 

291 error_msg = f"You can only react with the following emojis: {''.join(subset)}" 

292 

293 if error_msg: 

294 session.send_gateway_message(error_msg) 

295 if not isinstance(entity, LegacyMUC): 

296 # no need to carbon for groups, we just don't echo the stanza 

297 entity.react(legacy_id, carbon=True) # type: ignore 

298 await session.on_react(entity, legacy_id, [], thread=thread) 

299 raise XMPPError("not-acceptable", text=error_msg) 

300 

301 await session.on_react(entity, legacy_id, emojis, thread=thread) 

302 if isinstance(entity, LegacyMUC): 

303 await entity.echo(msg, None) 

304 else: 

305 self.__ack(msg) 

306 

307 multi = self.xmpp.store.multi.get_xmpp_ids(session.user_pk, react_to) 

308 if not multi: 

309 return 

310 multi = [m for m in multi if react_to != m] 

311 

312 if isinstance(entity, LegacyMUC): 

313 for xmpp_id in multi: 

314 mc = copy(msg) 

315 mc["reactions"]["id"] = xmpp_id 

316 await entity.echo(mc) 

317 elif isinstance(entity, LegacyContact): 

318 for xmpp_id in multi: 

319 entity.react(legacy_id, emojis, xmpp_id=xmpp_id, carbon=True) 

320 

321 def __ack(self, msg: Message): 

322 if not self.xmpp.PROPER_RECEIPTS: 

323 self.xmpp.delivery_receipt.ack(msg) 

324 

325 async def __get_reply( 

326 self, msg: Message, session: BaseSession, entity: Recipient 

327 ) -> tuple[ 

328 str, str | int | None, LegacyContact | LegacyParticipant | None, str | None 

329 ]: 

330 try: 

331 reply_to_msg_id = self._xmpp_msg_id_to_legacy(session, msg["reply"]["id"]) 

332 except XMPPError: 

333 session.log.debug( 

334 "Could not determine reply-to legacy msg ID, sending quote instead." 

335 ) 

336 return msg["body"], None, None, None 

337 

338 reply_to_jid = JID(msg["reply"]["to"]) 

339 reply_to = None 

340 if msg["type"] == "chat": 

341 if reply_to_jid.bare != session.user_jid.bare: 

342 try: 

343 reply_to = await session.contacts.by_jid(reply_to_jid) 

344 except XMPPError: 

345 pass 

346 elif msg["type"] == "groupchat": 

347 nick = reply_to_jid.resource 

348 try: 

349 muc = await session.bookmarks.by_jid(reply_to_jid) 

350 except XMPPError: 

351 pass 

352 else: 

353 if nick != muc.user_nick: 

354 reply_to = await muc.get_participant( 

355 reply_to_jid.resource, store=False 

356 ) 

357 

358 if msg.get_plugin("fallback", check=True) and ( 

359 isinstance(entity, LegacyMUC) or entity.REPLIES 

360 ): 

361 text = msg["fallback"].get_stripped_body(self.xmpp["xep_0461"].namespace) 

362 try: 

363 reply_fallback = msg["reply"].get_fallback_body() 

364 except AttributeError: 

365 reply_fallback = None 

366 else: 

367 text = msg["body"] 

368 reply_fallback = None 

369 

370 return text, reply_to_msg_id, reply_to, reply_fallback 

371 

372 async def __send_url( 

373 self, url: str, session: BaseSession, entity: Recipient, **kwargs 

374 ) -> int | str | None: 

375 async with self.xmpp.http.get(url) as response: 

376 if response.status >= 400: 

377 session.log.warning( 

378 "OOB url cannot be downloaded: %s, sending the URL as text" 

379 " instead.", 

380 response, 

381 ) 

382 return await session.on_text(entity, url, **kwargs) 

383 

384 return await session.on_file(entity, url, http_response=response, **kwargs) 

385 

386 async def __send_bob( 

387 self, from_: JID, cid: str, session: BaseSession, entity: Recipient, **kwargs 

388 ) -> int | str | None: 

389 sticker = self.xmpp.store.bob.get_sticker(cid) 

390 if sticker is None: 

391 await self.xmpp.plugin["xep_0231"].get_bob(from_, cid) 

392 sticker = self.xmpp.store.bob.get_sticker(cid) 

393 assert sticker is not None 

394 return await session.on_sticker(entity, sticker, **kwargs) 

395 

396 

397log = logging.getLogger(__name__)