Coverage for slidge / core / mixins / message.py: 87%
107 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import logging
2import uuid
3import warnings
4from typing import TYPE_CHECKING
6from slixmpp import Iq, Message
7from slixmpp.plugins.xep_0004.stanza.form import Form
9from ...util.types import AnyMUC, ChatState, LegacyMessageType, Marker
10from .attachment import AttachmentMixin
11from .message_maker import MessageMaker
12from .message_text import TextMessageMixin
14if TYPE_CHECKING:
15 pass
17# this is for MDS
18PUBLISH_OPTIONS = Form()
19PUBLISH_OPTIONS["type"] = "submit"
20PUBLISH_OPTIONS.add_field(
21 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options"
22)
23PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true")
24PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max")
25PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never")
26PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist")
29class ChatStateMixin(MessageMaker):
30 def __init__(self) -> None:
31 super().__init__()
32 self.__last_chat_state: ChatState | None = None
34 def _chat_state(
35 self, state: ChatState, forced: bool = False, **kwargs: object
36 ) -> None:
37 carbon = kwargs.get("carbon", False)
38 if carbon or (state == self.__last_chat_state and not forced):
39 return
40 self.__last_chat_state = state
41 msg = self._make_message(state=state, hints={"no-store"})
42 self._send(msg, **kwargs)
44 def active(self, **kwargs: object) -> None:
45 """
46 Send an "active" chat state (:xep:`0085`) from this
47 :term:`XMPP Entity`.
48 """
49 self._chat_state("active", forced=False, **kwargs)
51 def composing(self, **kwargs: object) -> None:
52 """
53 Send a "composing" (ie "typing notification") chat state (:xep:`0085`)
54 from this :term:`XMPP Entity`.
55 """
56 self._chat_state("composing", forced=True, **kwargs)
58 def paused(self, **kwargs: object) -> None:
59 """
60 Send a "paused" (ie "typing paused notification") chat state
61 (:xep:`0085`) from this :term:`XMPP Entity`.
62 """
63 self._chat_state("paused", forced=False, **kwargs)
65 def inactive(self, **kwargs: object) -> None:
66 """
67 Send an "inactive" (ie "contact has not interacted with the chat session
68 interface for an intermediate period of time") chat state (:xep:`0085`)
69 from this :term:`XMPP Entity`.
70 """
71 self._chat_state("inactive", forced=False, **kwargs)
73 def gone(self, **kwargs: object) -> None:
74 """
75 Send a "gone" (ie "contact has not interacted with the chat session interface,
76 system, or device for a relatively long period of time") chat state
77 (:xep:`0085`) from this :term:`XMPP Entity`.
78 """
79 self._chat_state("gone", forced=False, **kwargs)
82class MarkerMixin(MessageMaker):
83 def _make_marker(
84 self, legacy_msg_id: LegacyMessageType, marker: Marker, carbon: bool = False
85 ) -> Message:
86 msg = self._make_message(carbon=carbon)
87 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id)[-1]
88 return msg
90 def ack(self, legacy_msg_id: LegacyMessageType, **kwargs: object) -> None:
91 """
92 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
94 :param legacy_msg_id: The message this marker refers to
95 """
96 self._send(
97 self._make_marker(
98 legacy_msg_id, "acknowledged", carbon=bool(kwargs.get("carbon"))
99 ),
100 **kwargs,
101 )
103 def received(self, legacy_msg_id: LegacyMessageType, **kwargs: object) -> None:
104 """
105 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
106 If called on a :class:`LegacyContact`, also send a delivery receipt
107 marker (:xep:`0184`).
109 :param legacy_msg_id: The message this marker refers to
110 """
111 carbon = bool(kwargs.get("carbon"))
112 if self.mtype == "chat":
113 for msg_id in self._legacy_to_xmpp(legacy_msg_id):
114 self._send(
115 self.xmpp.delivery_receipt.make_ack(
116 msg_id,
117 mfrom=self.jid,
118 mto=self.user_jid,
119 )
120 )
121 self._send(
122 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs
123 )
125 def displayed(self, legacy_msg_id: LegacyMessageType, **kwargs: object) -> None:
126 """
127 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
129 :param legacy_msg_id: The message this marker refers to
130 """
131 if (
132 self.xmpp.MARK_ALL_MESSAGES
133 and (muc := getattr(self, "muc", None))
134 and getattr(self, "is_user", False)
135 ):
136 muc_jid = muc.jid.bare
137 with self.xmpp.store.session() as orm:
138 if self.xmpp.store.mam.is_displayed_by_user(
139 orm, muc_jid, str(legacy_msg_id)
140 ):
141 self.session.log.debug(
142 "Ignoring carbon marker for message already displayed by user."
143 )
144 return
145 else:
146 muc.pop_unread_xmpp_ids_up_to(
147 self._legacy_to_xmpp(legacy_msg_id)[-1]
148 )
150 self._send(
151 self._make_marker(
152 legacy_msg_id, "displayed", carbon=bool(kwargs.get("carbon"))
153 ),
154 **kwargs,
155 )
156 if getattr(self, "is_user", False):
157 self.session.create_task(self.__send_mds(legacy_msg_id))
159 async def __send_mds(self, legacy_msg_id: LegacyMessageType) -> None:
160 # Send a MDS displayed marker on behalf of the user for a group chat
161 if muc := getattr(self, "muc", None):
162 muc_jid = muc.jid.bare
163 else:
164 # This is not implemented for 1:1 chat because it would rely on
165 # storing the XMPP-server injected stanza-id, which we don't track
166 # ATM.
167 # In practice, MDS should mostly be useful for public group chats,
168 # so it should not be an issue.
169 # We'll see if we need to implement that later
170 return
171 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id)[-1]
172 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set")
173 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS
174 iq["pubsub"]["publish"]["item"]["id"] = muc_jid
175 displayed = self.xmpp["xep_0490"].stanza.Displayed()
176 displayed["stanza_id"]["id"] = xmpp_msg_id
177 displayed["stanza_id"]["by"] = muc_jid
178 iq["pubsub"]["publish"]["item"]["payload"] = displayed
179 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS
180 try:
181 await self.xmpp["xep_0356"].send_privileged_iq(iq)
182 except Exception as e:
183 self.session.log.debug("Could not MDS mark", exc_info=e)
186class ContentMessageMixin(AttachmentMixin, TextMessageMixin):
187 pass
190class CarbonMessageMixin(ContentMessageMixin, MarkerMixin):
191 def _privileged_send(self, msg: Message) -> None:
192 i = msg.get_id()
193 if i:
194 self.session.ignore_messages.add(i)
195 else:
196 i = "slidge-carbon-" + str(uuid.uuid4())
197 msg.set_id(i)
198 msg.del_origin_id()
199 try:
200 self.xmpp["xep_0356"].send_privileged_message(msg)
201 except PermissionError:
202 warnings.warn(
203 "Slidge does not have the privilege (XEP-0356) to send messages on behalf of users. "
204 "Consider configuring your XMPP server for that."
205 )
208class InviteMixin(MessageMaker):
209 def invite_to(
210 self,
211 muc: AnyMUC,
212 reason: str | None = None,
213 password: str | None = None,
214 **send_kwargs: object,
215 ) -> None:
216 """
217 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`.
219 :param muc: the muc the user is invited to
220 :param reason: a text explaining why the user should join this muc
221 :param password: maybe this will make sense later? not sure
222 :param send_kwargs: additional kwargs to be passed to _send()
223 (internal use by slidge)
224 """
225 msg = self._make_message(mtype="normal")
226 msg["groupchat_invite"]["jid"] = muc.jid
227 if reason:
228 msg["groupchat_invite"]["reason"] = reason
229 if password:
230 msg["groupchat_invite"]["password"] = password
231 self._send(msg, **send_kwargs)
234class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin):
235 pass
238class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin):
239 pass
242log = logging.getLogger(__name__)