Coverage for slidge/core/mixins/message_text.py: 91%

46 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import logging 

2from datetime import datetime 

3from typing import Iterable, Optional 

4 

5from ...util.types import ( 

6 LegacyMessageType, 

7 LegacyThreadType, 

8 LinkPreview, 

9 MessageReference, 

10 ProcessingHint, 

11) 

12from .message_maker import MessageMaker 

13 

14 

15class TextMessageMixin(MessageMaker): 

16 def __default_hints(self, hints: Optional[Iterable[ProcessingHint]] = None): 

17 if hints is not None: 

18 return hints 

19 elif self.mtype == "chat": 

20 return {"markable", "store"} 

21 elif self.mtype == "groupchat": 

22 return {"markable"} 

23 

24 def _replace_id(self, legacy_msg_id: LegacyMessageType): 

25 if self.mtype == "groupchat": 

26 return self.xmpp.store.sent.get_group_xmpp_id( 

27 self.session.user_pk, str(legacy_msg_id) 

28 ) or self._legacy_to_xmpp(legacy_msg_id) 

29 else: 

30 return self._legacy_to_xmpp(legacy_msg_id) 

31 

32 def send_text( 

33 self, 

34 body: str, 

35 legacy_msg_id: Optional[LegacyMessageType] = None, 

36 *, 

37 when: Optional[datetime] = None, 

38 reply_to: Optional[MessageReference] = None, 

39 thread: Optional[LegacyThreadType] = None, 

40 hints: Optional[Iterable[ProcessingHint]] = None, 

41 carbon=False, 

42 archive_only=False, 

43 correction=False, 

44 correction_event_id: Optional[LegacyMessageType] = None, 

45 link_previews: Optional[list[LinkPreview]] = None, 

46 **send_kwargs, 

47 ): 

48 """ 

49 Send a text message from this :term:`XMPP Entity`. 

50 

51 :param body: Content of the message 

52 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

53 user to the legacy network, specify this 

54 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

55 :param reply_to: Quote another message (:xep:`0461`) 

56 :param hints: 

57 :param thread: 

58 :param carbon: (only used if called on a :class:`LegacyContact`) 

59 Set this to ``True`` if this is actually a message sent **to** the 

60 :class:`LegacyContact` by the :term:`User`. 

61 Use this to synchronize outgoing history for legacy official apps. 

62 :param correction: whether this message is a correction or not 

63 :param correction_event_id: in the case where an ID is associated with the legacy 

64 'correction event', specify it here to use it on the XMPP side. If not specified, 

65 a random ID will be used. 

66 :param link_previews: A little of sender (or server, or gateway)-generated 

67 previews of URLs linked in the body. 

68 :param archive_only: (only in groups) Do not send this message to user, 

69 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

70 """ 

71 if carbon and not hasattr(self, "muc"): 

72 if not correction and self.xmpp.store.sent.was_sent_by_user( 

73 self.session.user_pk, str(legacy_msg_id) 

74 ): 

75 log.warning( 

76 "Carbon message for a message an XMPP has sent? This is a bug! %s", 

77 legacy_msg_id, 

78 ) 

79 return 

80 if hasattr(self, "muc") and not self.is_user: # type:ignore 

81 log.warning( 

82 "send_text() called with carbon=True on a participant who is not the user", 

83 legacy_msg_id, 

84 ) 

85 self.xmpp.store.sent.set_message( 

86 self.session.user_pk, 

87 str(legacy_msg_id), 

88 self.session.legacy_to_xmpp_msg_id(legacy_msg_id), 

89 ) 

90 hints = self.__default_hints(hints) 

91 msg = self._make_message( 

92 mbody=body, 

93 legacy_msg_id=correction_event_id if correction else legacy_msg_id, 

94 when=when, 

95 reply_to=reply_to, 

96 hints=hints or (), 

97 carbon=carbon, 

98 thread=thread, 

99 link_previews=link_previews, 

100 ) 

101 if correction: 

102 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

103 return self._send( 

104 msg, 

105 archive_only=archive_only, 

106 carbon=carbon, 

107 legacy_msg_id=legacy_msg_id, 

108 **send_kwargs, 

109 ) 

110 

111 def correct( 

112 self, 

113 legacy_msg_id: LegacyMessageType, 

114 new_text: str, 

115 *, 

116 when: Optional[datetime] = None, 

117 reply_to: Optional[MessageReference] = None, 

118 thread: Optional[LegacyThreadType] = None, 

119 hints: Optional[Iterable[ProcessingHint]] = None, 

120 carbon=False, 

121 archive_only=False, 

122 correction_event_id: Optional[LegacyMessageType] = None, 

123 link_previews: Optional[list[LinkPreview]] = None, 

124 **send_kwargs, 

125 ): 

126 """ 

127 Modify a message that was previously sent by this :term:`XMPP Entity`. 

128 

129 Uses last message correction (:xep:`0308`) 

130 

131 :param new_text: New content of the message 

132 :param legacy_msg_id: The legacy message ID of the message to correct 

133 :param when: when the message was sent, for a "delay" tag (:xep:`0203`) 

134 :param reply_to: Quote another message (:xep:`0461`) 

135 :param hints: 

136 :param thread: 

137 :param carbon: (only in 1:1) Reflect a message sent to this ``Contact`` by the user. 

138 Use this to synchronize outgoing history for legacy official apps. 

139 :param archive_only: (only in groups) Do not send this message to user, 

140 but store it in the archive. Meant to be used during ``MUC.backfill()`` 

141 :param correction_event_id: in the case where an ID is associated with the legacy 

142 'correction event', specify it here to use it on the XMPP side. If not specified, 

143 a random ID will be used. 

144 :param link_previews: A little of sender (or server, or gateway)-generated 

145 previews of URLs linked in the body. 

146 """ 

147 self.send_text( 

148 new_text, 

149 legacy_msg_id, 

150 when=when, 

151 reply_to=reply_to, 

152 hints=hints, 

153 carbon=carbon, 

154 thread=thread, 

155 correction=True, 

156 archive_only=archive_only, 

157 correction_event_id=correction_event_id, 

158 link_previews=link_previews, 

159 **send_kwargs, 

160 ) 

161 

162 def react( 

163 self, 

164 legacy_msg_id: LegacyMessageType, 

165 emojis: Iterable[str] = (), 

166 thread: Optional[LegacyThreadType] = None, 

167 **kwargs, 

168 ): 

169 """ 

170 Send a reaction (:xep:`0444`) from this :term:`XMPP Entity`. 

171 

172 :param legacy_msg_id: The message which the reaction refers to. 

173 :param emojis: An iterable of emojis used as reactions 

174 :param thread: 

175 """ 

176 msg = self._make_message( 

177 hints={"store"}, carbon=kwargs.get("carbon"), thread=thread 

178 ) 

179 xmpp_id = kwargs.pop("xmpp_id", None) 

180 if not xmpp_id: 

181 xmpp_id = self._legacy_to_xmpp(legacy_msg_id) 

182 self.xmpp["xep_0444"].set_reactions(msg, to_id=xmpp_id, reactions=emojis) 

183 self._send(msg, **kwargs) 

184 

185 def retract( 

186 self, 

187 legacy_msg_id: LegacyMessageType, 

188 thread: Optional[LegacyThreadType] = None, 

189 **kwargs, 

190 ): 

191 """ 

192 Send a message retraction (:XEP:`0424`) from this :term:`XMPP Entity`. 

193 

194 :param legacy_msg_id: Legacy ID of the message to delete 

195 :param thread: 

196 """ 

197 msg = self._make_message( 

198 state=None, 

199 hints={"store"}, 

200 mbody=f"/me retracted the message {legacy_msg_id}", 

201 carbon=kwargs.get("carbon"), 

202 thread=thread, 

203 ) 

204 msg.enable("fallback") 

205 # namespace version mismatch between slidge and slixmpp, update me later 

206 msg["fallback"]["for"] = self.xmpp["xep_0424"].namespace[:-1] + "1" 

207 msg["retract"]["id"] = msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

208 self._send(msg, **kwargs) 

209 

210 

211log = logging.getLogger(__name__)