Coverage for slidge/core/mixins/message_text.py: 90%

52 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import logging 

2from datetime import datetime 

3from typing import Iterable, Optional 

4 

5from ...util.types import ( 

6 LegacyMessageType, 

7 LegacyThreadType, 

8 LinkPreview, 

9 MessageReference, 

10 ProcessingHint, 

11) 

12from .message_maker import MessageMaker 

13 

14 

15class TextMessageMixin(MessageMaker): 

16 def __default_hints(self, hints: Optional[Iterable[ProcessingHint]] = None): 

17 if hints is not None: 

18 return hints 

19 elif self.mtype == "chat": 

20 return {"markable", "store"} 

21 elif self.mtype == "groupchat": 

22 return {"markable"} 

23 

24 def _replace_id(self, legacy_msg_id: LegacyMessageType): 

25 if self.mtype == "groupchat": 

26 with self.xmpp.store.session() as orm: 

27 ids = self.xmpp.store.id_map.get_xmpp( 

28 orm, self._recipient_pk(), str(legacy_msg_id), True 

29 ) 

30 if ids: 

31 return ids[0] 

32 return self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

33 else: 

34 return self._legacy_to_xmpp(legacy_msg_id) 

35 

36 def send_text( 

37 self, 

38 body: str, 

39 legacy_msg_id: Optional[LegacyMessageType] = None, 

40 *, 

41 when: Optional[datetime] = None, 

42 reply_to: Optional[MessageReference] = None, 

43 thread: Optional[LegacyThreadType] = None, 

44 hints: Optional[Iterable[ProcessingHint]] = None, 

45 carbon: bool = False, 

46 archive_only: bool = False, 

47 correction: bool = False, 

48 correction_event_id: Optional[LegacyMessageType] = None, 

49 link_previews: Optional[list[LinkPreview]] = None, 

50 **send_kwargs, 

51 ): 

52 """ 

53 Send a text message from this :term:`XMPP Entity`. 

54 

55 :param body: Content of the message 

56 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

57 user to the legacy network, specify this 

58 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

59 :param reply_to: Quote another message (:xep:`0461`) 

60 :param hints: 

61 :param thread: 

62 :param carbon: (only used if called on a :class:`LegacyContact`) 

63 Set this to ``True`` if this is actually a message sent **to** the 

64 :class:`LegacyContact` by the :term:`User`. 

65 Use this to synchronize outgoing history for legacy official apps. 

66 :param correction: whether this message is a correction or not 

67 :param correction_event_id: in the case where an ID is associated with the legacy 

68 'correction event', specify it here to use it on the XMPP side. If not specified, 

69 a random ID will be used. 

70 :param link_previews: A little of sender (or server, or gateway)-generated 

71 previews of URLs linked in the body. 

72 :param archive_only: (only in groups) Do not send this message to user, 

73 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

74 """ 

75 if carbon and not hasattr(self, "muc"): 

76 with self.xmpp.store.session() as orm: 

77 if not correction and self.xmpp.store.id_map.was_sent_by_user( 

78 orm, self._recipient_pk(), str(legacy_msg_id), self.is_group 

79 ): 

80 log.warning( 

81 "Carbon message for a message an XMPP has sent? This is a bug! %s", 

82 legacy_msg_id, 

83 ) 

84 return 

85 if hasattr(self, "muc") and not self.is_user: # type:ignore 

86 log.warning( 

87 "send_text() called with carbon=True on a participant who is not the user", 

88 legacy_msg_id, 

89 ) 

90 self.xmpp.store.id_map.set_msg( 

91 orm, 

92 self._recipient_pk(), 

93 str(legacy_msg_id), 

94 [self.session.legacy_to_xmpp_msg_id(legacy_msg_id)], 

95 self.is_group, 

96 ) 

97 orm.commit() 

98 hints = self.__default_hints(hints) 

99 msg = self._make_message( 

100 mbody=body, 

101 legacy_msg_id=correction_event_id if correction else legacy_msg_id, 

102 when=when, 

103 reply_to=reply_to, 

104 hints=hints or (), 

105 carbon=carbon, 

106 thread=thread, 

107 link_previews=link_previews, 

108 ) 

109 if correction: 

110 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

111 return self._send( 

112 msg, 

113 archive_only=archive_only, 

114 carbon=carbon, 

115 legacy_msg_id=legacy_msg_id, 

116 **send_kwargs, 

117 ) 

118 

119 def correct( 

120 self, 

121 legacy_msg_id: LegacyMessageType, 

122 new_text: str, 

123 *, 

124 when: Optional[datetime] = None, 

125 reply_to: Optional[MessageReference] = None, 

126 thread: Optional[LegacyThreadType] = None, 

127 hints: Optional[Iterable[ProcessingHint]] = None, 

128 carbon: bool = False, 

129 archive_only: bool = False, 

130 correction_event_id: Optional[LegacyMessageType] = None, 

131 link_previews: Optional[list[LinkPreview]] = None, 

132 **send_kwargs, 

133 ) -> None: 

134 """ 

135 Modify a message that was previously sent by this :term:`XMPP Entity`. 

136 

137 Uses last message correction (:xep:`0308`) 

138 

139 :param new_text: New content of the message 

140 :param legacy_msg_id: The legacy message ID of the message to correct 

141 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

142 :param reply_to: Quote another message (:xep:`0461`) 

143 :param hints: 

144 :param thread: 

145 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user. 

146 Use this to synchronize outgoing history for legacy official apps. 

147 :param archive_only: (only in groups) Do not send this message to user, 

148 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

149 :param correction_event_id: in the case where an ID is associated with the legacy 

150 'correction event', specify it here to use it on the XMPP side. If not specified, 

151 a random ID will be used. 

152 :param link_previews: A little of sender (or server, or gateway)-generated 

153 previews of URLs linked in the body. 

154 """ 

155 self.send_text( 

156 new_text, 

157 legacy_msg_id, 

158 when=when, 

159 reply_to=reply_to, 

160 hints=hints, 

161 carbon=carbon, 

162 thread=thread, 

163 correction=True, 

164 archive_only=archive_only, 

165 correction_event_id=correction_event_id, 

166 link_previews=link_previews, 

167 **send_kwargs, 

168 ) 

169 

170 def react( 

171 self, 

172 legacy_msg_id: LegacyMessageType, 

173 emojis: Iterable[str] = (), 

174 thread: Optional[LegacyThreadType] = None, 

175 **kwargs, 

176 ) -> None: 

177 """ 

178 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`. 

179 

180 :param legacy_msg_id: The message which the reaction refers to. 

181 :param emojis: An iterable of emojis used as reactions 

182 :param thread: 

183 """ 

184 msg = self._make_message( 

185 hints={"store"}, carbon=bool(kwargs.get("carbon")), thread=thread 

186 ) 

187 xmpp_id = kwargs.pop("xmpp_id", None) 

188 if not xmpp_id: 

189 xmpp_id = self._legacy_to_xmpp(legacy_msg_id) 

190 self.xmpp["xep_0444"].set_reactions(msg, to_id=xmpp_id, reactions=emojis) 

191 self._send(msg, **kwargs) 

192 

193 def retract( 

194 self, 

195 legacy_msg_id: LegacyMessageType, 

196 thread: Optional[LegacyThreadType] = None, 

197 **kwargs, 

198 ) -> None: 

199 """ 

200 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`. 

201 

202 :param legacy_msg_id: Legacy ID of the message to delete 

203 :param thread: 

204 """ 

205 msg = self._make_message( 

206 state=None, 

207 hints={"store"}, 

208 mbody=f"/me retracted the message {legacy_msg_id}", 

209 carbon=bool(kwargs.get("carbon")), 

210 thread=thread, 

211 ) 

212 msg.enable("fallback") 

213 # namespace version mismatch between slidge and slixmpp, update me later 

214 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1" 

215 msg["retract"]["id"] = msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

216 self._send(msg, **kwargs) 

217 

218 

219log = logging.getLogger(__name__)