Coverage for slidge/core/mixins/message.py: 81%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import logging 

2import uuid 

3import warnings 

4from typing import TYPE_CHECKING, Optional 

5 

6from slixmpp import Iq, Message 

7 

8from ...slixfix.xep_0490.mds import PUBLISH_OPTIONS 

9from ...util.types import ChatState, LegacyMessageType, Marker 

10from .attachment import AttachmentMixin 

11from .message_maker import MessageMaker 

12from .message_text import TextMessageMixin 

13 

14if TYPE_CHECKING: 

15 from ...group import LegacyMUC 

16 

17 

18class ChatStateMixin(MessageMaker): 

19 def __init__(self): 

20 super().__init__() 

21 self.__last_chat_state: Optional[ChatState] = None 

22 

23 def _chat_state(self, state: ChatState, forced=False, **kwargs): 

24 carbon = kwargs.get("carbon", False) 

25 if carbon or (state == self.__last_chat_state and not forced): 

26 return 

27 self.__last_chat_state = state 

28 msg = self._make_message(state=state, hints={"no-store"}) 

29 self._send(msg, **kwargs) 

30 

31 def active(self, **kwargs): 

32 """ 

33 Send an "active" chat state (:xep:`0085`) from this 

34 :term:`XMPP Entity`. 

35 """ 

36 self._chat_state("active", **kwargs) 

37 

38 def composing(self, **kwargs): 

39 """ 

40 Send a "composing" (ie "typing notification") chat state (:xep:`0085`) 

41 from this :term:`XMPP Entity`. 

42 """ 

43 self._chat_state("composing", forced=True, **kwargs) 

44 

45 def paused(self, **kwargs): 

46 """ 

47 Send a "paused" (ie "typing paused notification") chat state 

48 (:xep:`0085`) from this :term:`XMPP Entity`. 

49 """ 

50 self._chat_state("paused", **kwargs) 

51 

52 def inactive(self, **kwargs): 

53 """ 

54 Send an "inactive" (ie "contact has not interacted with the chat session 

55 interface for an intermediate period of time") chat state (:xep:`0085`) 

56 from this :term:`XMPP Entity`. 

57 """ 

58 self._chat_state("inactive", **kwargs) 

59 

60 def gone(self, **kwargs): 

61 """ 

62 Send a "gone" (ie "contact has not interacted with the chat session interface, 

63 system, or device for a relatively long period of time") chat state 

64 (:xep:`0085`) from this :term:`XMPP Entity`. 

65 """ 

66 self._chat_state("gone", **kwargs) 

67 

68 

69class MarkerMixin(MessageMaker): 

70 is_group: bool = NotImplemented 

71 

72 def _make_marker( 

73 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon=False 

74 ): 

75 msg = self._make_message(carbon=carbon) 

76 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id) 

77 return msg 

78 

79 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs): 

80 """ 

81 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

82 

83 :param legacy_msg_id: The message this marker refers to 

84 """ 

85 self._send( 

86 self._make_marker( 

87 legacy_msg_id, "acknowledged", carbon=kwargs.get("carbon") 

88 ), 

89 **kwargs, 

90 ) 

91 

92 def received(self, legacy_msg_id: LegacyMessageType, **kwargs): 

93 """ 

94 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

95 If called on a :class:`LegacyContact`, also send a delivery receipt 

96 marker (:xep:`0184`). 

97 

98 :param legacy_msg_id: The message this marker refers to 

99 """ 

100 carbon = kwargs.get("carbon") 

101 if self.mtype == "chat": 

102 self._send( 

103 self.xmpp.delivery_receipt.make_ack( 

104 self._legacy_to_xmpp(legacy_msg_id), 

105 mfrom=self.jid, 

106 mto=self.user_jid, 

107 ) 

108 ) 

109 self._send( 

110 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs 

111 ) 

112 

113 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs): 

114 """ 

115 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`. 

116 

117 :param legacy_msg_id: The message this marker refers to 

118 """ 

119 self._send( 

120 self._make_marker(legacy_msg_id, "displayed", carbon=kwargs.get("carbon")), 

121 **kwargs, 

122 ) 

123 if getattr(self, "is_user", False): 

124 self.session.create_task(self.__send_mds(legacy_msg_id)) 

125 

126 async def __send_mds(self, legacy_msg_id: LegacyMessageType): 

127 # Send a MDS displayed marker on behalf of the user for a group chat 

128 if muc := getattr(self, "muc", None): 

129 muc_jid = muc.jid.bare 

130 else: 

131 # This is not implemented for 1:1 chat because it would rely on 

132 # storing the XMPP-server injected stanza-id, which we don't track 

133 # ATM. 

134 # In practice, MDS should mostly be useful for public group chats, 

135 # so it should not be an issue. 

136 # We'll see if we need to implement that later 

137 return 

138 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id) 

139 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set") 

140 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS 

141 iq["pubsub"]["publish"]["item"]["id"] = muc_jid 

142 displayed = self.xmpp["xep_0490"].stanza.Displayed() 

143 displayed["stanza_id"]["id"] = xmpp_msg_id 

144 displayed["stanza_id"]["by"] = muc_jid 

145 iq["pubsub"]["publish"]["item"]["payload"] = displayed 

146 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS 

147 try: 

148 await self.xmpp["xep_0356"].send_privileged_iq(iq) 

149 except Exception as e: 

150 self.session.log.debug("Could not MDS mark", exc_info=e) 

151 

152 

153class ContentMessageMixin(AttachmentMixin, TextMessageMixin): 

154 pass 

155 

156 

157class CarbonMessageMixin(ContentMessageMixin, MarkerMixin): 

158 def _privileged_send(self, msg: Message): 

159 i = msg.get_id() 

160 if i: 

161 self.session.ignore_messages.add(i) 

162 else: 

163 i = "slidge-carbon-" + str(uuid.uuid4()) 

164 msg.set_id(i) 

165 msg.del_origin_id() 

166 try: 

167 self.xmpp["xep_0356"].send_privileged_message(msg) 

168 except PermissionError: 

169 try: 

170 self.xmpp["xep_0356_old"].send_privileged_message(msg) 

171 except PermissionError: 

172 warnings.warn( 

173 "Slidge does not have privileges to send message on behalf of" 

174 " user.Refer to" 

175 " https://slidge.im/core/admin/privilege.html" 

176 " for more info." 

177 ) 

178 

179 

180class InviteMixin(MessageMaker): 

181 def invite_to( 

182 self, 

183 muc: "LegacyMUC", 

184 reason: Optional[str] = None, 

185 password: Optional[str] = None, 

186 **send_kwargs, 

187 ): 

188 """ 

189 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`. 

190 

191 :param muc: the muc the user is invited to 

192 :param reason: a text explaining why the user should join this muc 

193 :param password: maybe this will make sense later? not sure 

194 :param send_kwargs: additional kwargs to be passed to _send() 

195 (internal use by slidge) 

196 """ 

197 msg = self._make_message(mtype="normal") 

198 msg["groupchat_invite"]["jid"] = muc.jid 

199 if reason: 

200 msg["groupchat_invite"]["reason"] = reason 

201 if password: 

202 msg["groupchat_invite"]["password"] = password 

203 self._send(msg, **send_kwargs) 

204 

205 

206class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin): 

207 pass 

208 

209 

210class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin): 

211 pass 

212 

213 

214log = logging.getLogger(__name__)