Coverage for slidge / core / mixins / message_maker.py: 92%
109 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
1import uuid
2import warnings
3from collections.abc import Iterable
4from datetime import UTC, datetime
5from typing import TYPE_CHECKING, cast
7from slixmpp import JID, Message
8from slixmpp.types import MessageTypes
10from slidge.util import strip_illegal_chars
12from ...db.models import GatewayUser
13from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza
14from ...util.types import (
15 ChatState,
16 LegacyMessageType,
17 LinkPreview,
18 MessageReference,
19 ProcessingHint,
20)
21from .. import config
22from .base import BaseSender
24if TYPE_CHECKING:
25 from ...group import LegacyMUC, LegacyParticipant
28class MessageMaker(BaseSender):
29 mtype: MessageTypes = NotImplemented
30 _can_send_carbon: bool = NotImplemented
31 STRIP_SHORT_DELAY = False
32 USE_STANZA_ID = False
34 muc: "LegacyMUC"
36 def _recipient_pk(self) -> int:
37 return (
38 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore
39 )
41 def _make_message(
42 self,
43 state: ChatState | None = None,
44 hints: Iterable[ProcessingHint] = (),
45 legacy_msg_id: LegacyMessageType | None = None,
46 when: datetime | None = None,
47 reply_to: MessageReference | None = None,
48 carbon: bool = False,
49 link_previews: Iterable[LinkPreview] | None = None,
50 **kwargs,
51 ):
52 body = kwargs.pop("mbody", None)
53 mfrom = kwargs.pop("mfrom", self.jid)
54 mto = kwargs.pop("mto", None)
55 thread = kwargs.pop("thread", None)
56 if carbon and self._can_send_carbon:
57 # the msg needs to have jabber:client as xmlns, so
58 # we don't want to associate with the XML stream
59 msg_cls = Message
60 else:
61 msg_cls = self.xmpp.Message # type:ignore
62 msg = msg_cls(
63 sfrom=mfrom,
64 stype=kwargs.pop("mtype", None) or self.mtype,
65 sto=mto,
66 **kwargs,
67 )
68 if body:
69 msg["body"] = strip_illegal_chars(body, "�")
70 state = "active"
71 if thread:
72 with self.xmpp.store.session() as orm:
73 thread_str = str(thread)
74 msg["thread"] = (
75 self.xmpp.store.id_map.get_thread(
76 orm,
77 self._recipient_pk(),
78 thread_str,
79 self.is_participant,
80 )
81 or thread_str
82 )
83 if state:
84 msg["chat_state"] = state
85 for hint in hints:
86 msg.enable(hint)
87 self._set_msg_id(msg, legacy_msg_id)
88 self._add_delay(msg, when)
89 if link_previews:
90 self._add_link_previews(msg, link_previews)
91 if reply_to:
92 self._add_reply_to(msg, reply_to)
93 return msg
95 def _set_msg_id(
96 self, msg: Message, legacy_msg_id: LegacyMessageType | None = None
97 ) -> None:
98 if legacy_msg_id is not None:
99 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id)
100 msg.set_id(i)
101 if self.USE_STANZA_ID:
102 msg["stanza_id"]["id"] = i
103 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
104 elif self.USE_STANZA_ID:
105 msg["stanza_id"]["id"] = str(uuid.uuid4())
106 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
108 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> list[str]:
109 with self.xmpp.store.session() as orm:
110 ids = self.xmpp.store.id_map.get_xmpp(
111 orm,
112 self._recipient_pk(),
113 str(legacy_id),
114 self.is_participant,
115 )
116 if ids:
117 return ids
118 return [self.session.legacy_to_xmpp_msg_id(legacy_id)]
120 def _add_delay(self, msg: Message, when: datetime | None) -> None:
121 if when:
122 if when.tzinfo is None:
123 when = when.astimezone(UTC)
124 if self.STRIP_SHORT_DELAY:
125 delay = (datetime.now().astimezone(UTC) - when).seconds
126 if delay < config.IGNORE_DELAY_THRESHOLD:
127 return
128 msg["delay"].set_stamp(when)
129 msg["delay"].set_from(self.xmpp.boundjid.bare)
131 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None:
132 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0]
133 msg["reply"]["id"] = xmpp_id
135 muc = getattr(self, "muc", None)
137 if entity := reply_to.author:
138 if entity == "user" or isinstance(entity, GatewayUser):
139 if isinstance(entity, GatewayUser):
140 warnings.warn(
141 "Using a GatewayUser as the author of a "
142 "MessageReference is deprecated. Use the string 'user' "
143 "instead.",
144 DeprecationWarning,
145 )
146 if muc:
147 jid = JID(muc.jid)
148 jid.resource = fallback_nick = muc.user_nick
149 msg["reply"]["to"] = jid
150 else:
151 msg["reply"]["to"] = self.session.user_jid
152 # TODO: here we should use preferably use the PEP nick of the user
153 # (but it doesn't matter much)
154 fallback_nick = self.session.user_jid.user
155 else:
156 if muc:
157 if hasattr(entity, "muc"):
158 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id()
159 # a bit of work because right now this is a sync function
160 entity = cast("LegacyParticipant", entity)
161 fallback_nick = entity.nickname
162 else:
163 warnings.warn(
164 "The author of a message reference in a MUC must be a"
165 " Participant instance, not a Contact"
166 )
167 fallback_nick = entity.name
168 else:
169 fallback_nick = entity.name
170 msg["reply"]["to"] = entity.jid
171 else:
172 fallback_nick = None
174 if fallback := reply_to.body:
175 msg["reply"].add_quoted_fallback(fallback, fallback_nick)
177 @staticmethod
178 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None:
179 for preview in link_previews:
180 element = LinkPreviewStanza()
181 for i, name in enumerate(preview._fields):
182 val = preview[i]
183 if not val:
184 continue
185 element[name] = val
186 msg.append(element)