Coverage for slidge / core / mixins / message.py: 87%
107 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import logging
2import uuid
3import warnings
4from typing import TYPE_CHECKING, Optional
6from slixmpp import Iq, Message
7from slixmpp.plugins.xep_0004 import Form
9from ...util.types import ChatState, LegacyMessageType, Marker
10from .attachment import AttachmentMixin
11from .message_maker import MessageMaker
12from .message_text import TextMessageMixin
14if TYPE_CHECKING:
15 from ...group import LegacyMUC
17# this is for MDS
18PUBLISH_OPTIONS = Form()
19PUBLISH_OPTIONS["type"] = "submit"
20PUBLISH_OPTIONS.add_field(
21 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options"
22)
23PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true")
24PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max")
25PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never")
26PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist")
29class ChatStateMixin(MessageMaker):
30 def __init__(self) -> None:
31 super().__init__()
32 self.__last_chat_state: Optional[ChatState] = None
34 def _chat_state(self, state: ChatState, forced: bool = False, **kwargs) -> None:
35 carbon = kwargs.get("carbon", False)
36 if carbon or (state == self.__last_chat_state and not forced):
37 return
38 self.__last_chat_state = state
39 msg = self._make_message(state=state, hints={"no-store"})
40 self._send(msg, **kwargs)
42 def active(self, **kwargs) -> None:
43 """
44 Send an "active" chat state (:xep:`0085`) from this
45 :term:`XMPP Entity`.
46 """
47 self._chat_state("active", **kwargs)
49 def composing(self, **kwargs) -> None:
50 """
51 Send a "composing" (ie "typing notification") chat state (:xep:`0085`)
52 from this :term:`XMPP Entity`.
53 """
54 self._chat_state("composing", forced=True, **kwargs)
56 def paused(self, **kwargs) -> None:
57 """
58 Send a "paused" (ie "typing paused notification") chat state
59 (:xep:`0085`) from this :term:`XMPP Entity`.
60 """
61 self._chat_state("paused", **kwargs)
63 def inactive(self, **kwargs) -> None:
64 """
65 Send an "inactive" (ie "contact has not interacted with the chat session
66 interface for an intermediate period of time") chat state (:xep:`0085`)
67 from this :term:`XMPP Entity`.
68 """
69 self._chat_state("inactive", **kwargs)
71 def gone(self, **kwargs) -> None:
72 """
73 Send a "gone" (ie "contact has not interacted with the chat session interface,
74 system, or device for a relatively long period of time") chat state
75 (:xep:`0085`) from this :term:`XMPP Entity`.
76 """
77 self._chat_state("gone", **kwargs)
80class MarkerMixin(MessageMaker):
81 def _make_marker(
82 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon: bool = False
83 ):
84 msg = self._make_message(carbon=carbon)
85 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id)[-1]
86 return msg
88 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None:
89 """
90 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
92 :param legacy_msg_id: The message this marker refers to
93 """
94 self._send(
95 self._make_marker(
96 legacy_msg_id, "acknowledged", carbon=bool(kwargs.get("carbon"))
97 ),
98 **kwargs,
99 )
101 def received(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None:
102 """
103 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
104 If called on a :class:`LegacyContact`, also send a delivery receipt
105 marker (:xep:`0184`).
107 :param legacy_msg_id: The message this marker refers to
108 """
109 carbon = bool(kwargs.get("carbon"))
110 if self.mtype == "chat":
111 for msg_id in self._legacy_to_xmpp(legacy_msg_id):
112 self._send(
113 self.xmpp.delivery_receipt.make_ack(
114 msg_id,
115 mfrom=self.jid,
116 mto=self.user_jid,
117 )
118 )
119 self._send(
120 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs
121 )
123 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs) -> None:
124 """
125 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
127 :param legacy_msg_id: The message this marker refers to
128 """
129 if (
130 self.xmpp.MARK_ALL_MESSAGES
131 and (muc := getattr(self, "muc", None))
132 and getattr(self, "is_user", False)
133 ):
134 muc_jid = muc.jid.bare
135 with self.xmpp.store.session() as orm:
136 if self.xmpp.store.mam.is_displayed_by_user(
137 orm, muc_jid, str(legacy_msg_id)
138 ):
139 self.session.log.debug(
140 "Ignoring carbon marker for message already displayed by user."
141 )
142 return
143 else:
144 muc.pop_unread_xmpp_ids_up_to(
145 self._legacy_to_xmpp(legacy_msg_id)[-1]
146 )
148 self._send(
149 self._make_marker(
150 legacy_msg_id, "displayed", carbon=bool(kwargs.get("carbon"))
151 ),
152 **kwargs,
153 )
154 if getattr(self, "is_user", False):
155 self.session.create_task(self.__send_mds(legacy_msg_id))
157 async def __send_mds(self, legacy_msg_id: LegacyMessageType) -> None:
158 # Send a MDS displayed marker on behalf of the user for a group chat
159 if muc := getattr(self, "muc", None):
160 muc_jid = muc.jid.bare
161 else:
162 # This is not implemented for 1:1 chat because it would rely on
163 # storing the XMPP-server injected stanza-id, which we don't track
164 # ATM.
165 # In practice, MDS should mostly be useful for public group chats,
166 # so it should not be an issue.
167 # We'll see if we need to implement that later
168 return
169 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id)[-1]
170 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set")
171 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS
172 iq["pubsub"]["publish"]["item"]["id"] = muc_jid
173 displayed = self.xmpp["xep_0490"].stanza.Displayed()
174 displayed["stanza_id"]["id"] = xmpp_msg_id
175 displayed["stanza_id"]["by"] = muc_jid
176 iq["pubsub"]["publish"]["item"]["payload"] = displayed
177 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS
178 try:
179 await self.xmpp["xep_0356"].send_privileged_iq(iq)
180 except Exception as e:
181 self.session.log.debug("Could not MDS mark", exc_info=e)
184class ContentMessageMixin(AttachmentMixin, TextMessageMixin):
185 pass
188class CarbonMessageMixin(ContentMessageMixin, MarkerMixin):
189 def _privileged_send(self, msg: Message) -> None:
190 i = msg.get_id()
191 if i:
192 self.session.ignore_messages.add(i)
193 else:
194 i = "slidge-carbon-" + str(uuid.uuid4())
195 msg.set_id(i)
196 msg.del_origin_id()
197 try:
198 self.xmpp["xep_0356"].send_privileged_message(msg)
199 except PermissionError:
200 warnings.warn(
201 "Slidge does not have the privilege (XEP-0356) to send messages on behalf of users. "
202 "Consider configuring your XMPP server for that."
203 )
206class InviteMixin(MessageMaker):
207 def invite_to(
208 self,
209 muc: "LegacyMUC",
210 reason: Optional[str] = None,
211 password: Optional[str] = None,
212 **send_kwargs,
213 ) -> None:
214 """
215 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`.
217 :param muc: the muc the user is invited to
218 :param reason: a text explaining why the user should join this muc
219 :param password: maybe this will make sense later? not sure
220 :param send_kwargs: additional kwargs to be passed to _send()
221 (internal use by slidge)
222 """
223 msg = self._make_message(mtype="normal")
224 msg["groupchat_invite"]["jid"] = muc.jid
225 if reason:
226 msg["groupchat_invite"]["reason"] = reason
227 if password:
228 msg["groupchat_invite"]["password"] = password
229 self._send(msg, **send_kwargs)
232class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin):
233 pass
236class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin):
237 pass
240log = logging.getLogger(__name__)