Coverage for slidge / core / mixins / message.py: 87%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import logging
2import uuid
3import warnings
5from slixmpp import Iq, Message
6from slixmpp.plugins.xep_0004.stanza.form import Form
8from ...util.types import AnyMUC, ChatState, Marker
9from .attachment import AttachmentMixin
10from .message_maker import MessageMaker
11from .message_text import TextMessageMixin
13# this is for MDS
14PUBLISH_OPTIONS = Form()
15PUBLISH_OPTIONS["type"] = "submit"
16PUBLISH_OPTIONS.add_field(
17 "FORM_TYPE", "hidden", value="http://jabber.org/protocol/pubsub#publish-options"
18)
19PUBLISH_OPTIONS.add_field("pubsub#persist_items", value="true")
20PUBLISH_OPTIONS.add_field("pubsub#max_items", value="max")
21PUBLISH_OPTIONS.add_field("pubsub#send_last_published_item", value="never")
22PUBLISH_OPTIONS.add_field("pubsub#access_model", value="whitelist")
25class ChatStateMixin(MessageMaker):
26 def __init__(self) -> None:
27 super().__init__()
28 self.__last_chat_state: ChatState | None = None
30 def _chat_state(
31 self, state: ChatState, forced: bool = False, **kwargs: object
32 ) -> None:
33 carbon = kwargs.get("carbon", False)
34 if carbon or (state == self.__last_chat_state and not forced):
35 return
36 self.__last_chat_state = state
37 msg = self._make_message(state=state, hints={"no-store"})
38 self._send(msg, **kwargs)
40 def active(self, **kwargs: object) -> None:
41 """
42 Send an "active" chat state (:xep:`0085`) from this
43 :term:`XMPP Entity`.
44 """
45 self._chat_state("active", forced=False, **kwargs)
47 def composing(self, **kwargs: object) -> None:
48 """
49 Send a "composing" (ie "typing notification") chat state (:xep:`0085`)
50 from this :term:`XMPP Entity`.
51 """
52 self._chat_state("composing", forced=True, **kwargs)
54 def paused(self, **kwargs: object) -> None:
55 """
56 Send a "paused" (ie "typing paused notification") chat state
57 (:xep:`0085`) from this :term:`XMPP Entity`.
58 """
59 self._chat_state("paused", forced=False, **kwargs)
61 def inactive(self, **kwargs: object) -> None:
62 """
63 Send an "inactive" (ie "contact has not interacted with the chat session
64 interface for an intermediate period of time") chat state (:xep:`0085`)
65 from this :term:`XMPP Entity`.
66 """
67 self._chat_state("inactive", forced=False, **kwargs)
69 def gone(self, **kwargs: object) -> None:
70 """
71 Send a "gone" (ie "contact has not interacted with the chat session interface,
72 system, or device for a relatively long period of time") chat state
73 (:xep:`0085`) from this :term:`XMPP Entity`.
74 """
75 self._chat_state("gone", forced=False, **kwargs)
78class MarkerMixin(MessageMaker):
79 def _make_marker(
80 self, legacy_msg_id: str, marker: Marker, carbon: bool = False
81 ) -> Message:
82 msg = self._make_message(carbon=carbon)
83 msg[marker]["id"] = self._legacy_to_xmpp(legacy_msg_id)[-1]
84 return msg
86 def ack(self, legacy_msg_id: str, **kwargs: object) -> None:
87 """
88 Send an "acknowledged" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
90 :param legacy_msg_id: The message this marker refers to
91 """
92 self._send(
93 self._make_marker(
94 legacy_msg_id, "acknowledged", carbon=bool(kwargs.get("carbon"))
95 ),
96 **kwargs,
97 )
99 def received(self, legacy_msg_id: str, **kwargs: object) -> None:
100 """
101 Send a "received" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
102 If called on a :class:`LegacyContact`, also send a delivery receipt
103 marker (:xep:`0184`).
105 :param legacy_msg_id: The message this marker refers to
106 """
107 carbon = bool(kwargs.get("carbon"))
108 if self.mtype == "chat":
109 for msg_id in self._legacy_to_xmpp(legacy_msg_id):
110 self._send(
111 self.xmpp.delivery_receipt.make_ack(
112 msg_id,
113 mfrom=self.jid,
114 mto=self.user_jid,
115 )
116 )
117 self._send(
118 self._make_marker(legacy_msg_id, "received", carbon=carbon), **kwargs
119 )
121 def displayed(self, legacy_msg_id: str, **kwargs: object) -> None:
122 """
123 Send a "displayed" message marker (:xep:`0333`) from this :term:`XMPP Entity`.
125 :param legacy_msg_id: The message this marker refers to
126 """
127 if (
128 self.xmpp.MARK_ALL_MESSAGES
129 and (muc := getattr(self, "muc", None))
130 and getattr(self, "is_user", False)
131 ):
132 muc_jid = muc.jid.bare
133 with self.xmpp.store.session() as orm:
134 if self.xmpp.store.mam.is_displayed_by_user(
135 orm, muc_jid, str(legacy_msg_id)
136 ):
137 self.session.log.debug(
138 "Ignoring carbon marker for message already displayed by user."
139 )
140 return
141 else:
142 muc.pop_unread_xmpp_ids_up_to(
143 self._legacy_to_xmpp(legacy_msg_id)[-1]
144 )
146 self._send(
147 self._make_marker(
148 legacy_msg_id, "displayed", carbon=bool(kwargs.get("carbon"))
149 ),
150 **kwargs,
151 )
152 if getattr(self, "is_user", False):
153 self.session.create_task(self.__send_mds(legacy_msg_id))
155 async def __send_mds(self, legacy_msg_id: str) -> None:
156 # Send a MDS displayed marker on behalf of the user for a group chat
157 if muc := getattr(self, "muc", None):
158 muc_jid = muc.jid.bare
159 else:
160 # This is not implemented for 1:1 chat because it would rely on
161 # storing the XMPP-server injected stanza-id, which we don't track
162 # ATM.
163 # In practice, MDS should mostly be useful for public group chats,
164 # so it should not be an issue.
165 # We'll see if we need to implement that later
166 return
167 xmpp_msg_id = self._legacy_to_xmpp(legacy_msg_id)[-1]
168 iq = Iq(sto=self.user_jid.bare, sfrom=self.user_jid.bare, stype="set")
169 iq["pubsub"]["publish"]["node"] = self.xmpp["xep_0490"].stanza.NS
170 iq["pubsub"]["publish"]["item"]["id"] = muc_jid
171 displayed = self.xmpp["xep_0490"].stanza.Displayed()
172 displayed["stanza_id"]["id"] = xmpp_msg_id
173 displayed["stanza_id"]["by"] = muc_jid
174 iq["pubsub"]["publish"]["item"]["payload"] = displayed
175 iq["pubsub"]["publish_options"] = PUBLISH_OPTIONS
176 try:
177 await self.xmpp["xep_0356"].send_privileged_iq(iq)
178 except Exception as e:
179 self.session.log.debug("Could not MDS mark", exc_info=e)
182class ContentMessageMixin(AttachmentMixin, TextMessageMixin):
183 pass
186class CarbonMessageMixin(ContentMessageMixin, MarkerMixin):
187 def _privileged_send(self, msg: Message) -> None:
188 i = msg.get_id()
189 if i:
190 self.session.ignore_messages.add(i)
191 else:
192 i = "slidge-carbon-" + str(uuid.uuid4())
193 msg.set_id(i)
194 msg.del_origin_id()
195 try:
196 self.xmpp["xep_0356"].send_privileged_message(msg)
197 except PermissionError:
198 warnings.warn(
199 "Slidge does not have the privilege (XEP-0356) to send messages on behalf of users. "
200 "Consider configuring your XMPP server for that."
201 )
204class InviteMixin(MessageMaker):
205 def invite_to(
206 self,
207 muc: AnyMUC,
208 reason: str | None = None,
209 password: str | None = None,
210 **send_kwargs: object,
211 ) -> None:
212 """
213 Send an invitation to join a group (:xep:`0249`) from this :term:`XMPP Entity`.
215 :param muc: the muc the user is invited to
216 :param reason: a text explaining why the user should join this muc
217 :param password: maybe this will make sense later? not sure
218 :param send_kwargs: additional kwargs to be passed to _send()
219 (internal use by slidge)
220 """
221 msg = self._make_message(mtype="normal")
222 msg["groupchat_invite"]["jid"] = muc.jid
223 if reason:
224 msg["groupchat_invite"]["reason"] = reason
225 if password:
226 msg["groupchat_invite"]["password"] = password
227 self._send(msg, **send_kwargs)
230class MessageMixin(InviteMixin, ChatStateMixin, MarkerMixin, ContentMessageMixin):
231 pass
234class MessageCarbonMixin(InviteMixin, ChatStateMixin, CarbonMessageMixin):
235 pass
238log = logging.getLogger(__name__)