Coverage for slidge / core / mixins / message_maker.py: 92%
108 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import uuid
2import warnings
3from datetime import datetime, timezone
4from typing import TYPE_CHECKING, Iterable, Optional, cast
6from slixmpp import JID, Message
7from slixmpp.types import MessageTypes
9from slidge.util import strip_illegal_chars
11from ...db.models import GatewayUser
12from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza
13from ...util.types import (
14 ChatState,
15 LegacyMessageType,
16 LinkPreview,
17 MessageReference,
18 ProcessingHint,
19)
20from .. import config
21from .base import BaseSender
23if TYPE_CHECKING:
24 from ...group import LegacyMUC, LegacyParticipant
27class MessageMaker(BaseSender):
28 mtype: MessageTypes = NotImplemented
29 _can_send_carbon: bool = NotImplemented
30 STRIP_SHORT_DELAY = False
31 USE_STANZA_ID = False
33 muc: "LegacyMUC"
35 def _recipient_pk(self) -> int:
36 return (
37 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore
38 )
40 def _make_message(
41 self,
42 state: Optional[ChatState] = None,
43 hints: Iterable[ProcessingHint] = (),
44 legacy_msg_id: Optional[LegacyMessageType] = None,
45 when: Optional[datetime] = None,
46 reply_to: Optional[MessageReference] = None,
47 carbon: bool = False,
48 link_previews: Optional[Iterable[LinkPreview]] = None,
49 **kwargs,
50 ):
51 body = kwargs.pop("mbody", None)
52 mfrom = kwargs.pop("mfrom", self.jid)
53 mto = kwargs.pop("mto", None)
54 thread = kwargs.pop("thread", None)
55 if carbon and self._can_send_carbon:
56 # the msg needs to have jabber:client as xmlns, so
57 # we don't want to associate with the XML stream
58 msg_cls = Message
59 else:
60 msg_cls = self.xmpp.Message # type:ignore
61 msg = msg_cls(
62 sfrom=mfrom,
63 stype=kwargs.pop("mtype", None) or self.mtype,
64 sto=mto,
65 **kwargs,
66 )
67 if body:
68 msg["body"] = strip_illegal_chars(body, "�")
69 state = "active"
70 if thread:
71 with self.xmpp.store.session() as orm:
72 thread_str = str(thread)
73 msg["thread"] = (
74 self.xmpp.store.id_map.get_thread(
75 orm,
76 self._recipient_pk(),
77 thread_str,
78 self.is_participant,
79 )
80 or thread_str
81 )
82 if state:
83 msg["chat_state"] = state
84 for hint in hints:
85 msg.enable(hint)
86 self._set_msg_id(msg, legacy_msg_id)
87 self._add_delay(msg, when)
88 if link_previews:
89 self._add_link_previews(msg, link_previews)
90 if reply_to:
91 self._add_reply_to(msg, reply_to)
92 return msg
94 def _set_msg_id(
95 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None
96 ) -> None:
97 if legacy_msg_id is not None:
98 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id)
99 msg.set_id(i)
100 if self.USE_STANZA_ID:
101 msg["stanza_id"]["id"] = i
102 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
103 elif self.USE_STANZA_ID:
104 msg["stanza_id"]["id"] = str(uuid.uuid4())
105 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
107 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> list[str]:
108 with self.xmpp.store.session() as orm:
109 ids = self.xmpp.store.id_map.get_xmpp(
110 orm,
111 self._recipient_pk(),
112 str(legacy_id),
113 self.is_participant,
114 )
115 if ids:
116 return ids
117 return [self.session.legacy_to_xmpp_msg_id(legacy_id)]
119 def _add_delay(self, msg: Message, when: Optional[datetime]) -> None:
120 if when:
121 if when.tzinfo is None:
122 when = when.astimezone(timezone.utc)
123 if self.STRIP_SHORT_DELAY:
124 delay = (datetime.now().astimezone(timezone.utc) - when).seconds
125 if delay < config.IGNORE_DELAY_THRESHOLD:
126 return
127 msg["delay"].set_stamp(when)
128 msg["delay"].set_from(self.xmpp.boundjid.bare)
130 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None:
131 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0]
132 msg["reply"]["id"] = xmpp_id
134 muc = getattr(self, "muc", None)
136 if entity := reply_to.author:
137 if entity == "user" or isinstance(entity, GatewayUser):
138 if isinstance(entity, GatewayUser):
139 warnings.warn(
140 "Using a GatewayUser as the author of a "
141 "MessageReference is deprecated. Use the string 'user' "
142 "instead.",
143 DeprecationWarning,
144 )
145 if muc:
146 jid = JID(muc.jid)
147 jid.resource = fallback_nick = muc.user_nick
148 msg["reply"]["to"] = jid
149 else:
150 msg["reply"]["to"] = self.session.user_jid
151 # TODO: here we should use preferably use the PEP nick of the user
152 # (but it doesn't matter much)
153 fallback_nick = self.session.user_jid.user
154 else:
155 if muc:
156 if hasattr(entity, "muc"):
157 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id()
158 # a bit of work because right now this is a sync function
159 entity = cast("LegacyParticipant", entity)
160 fallback_nick = entity.nickname
161 else:
162 warnings.warn(
163 "The author of a message reference in a MUC must be a"
164 " Participant instance, not a Contact"
165 )
166 fallback_nick = entity.name
167 else:
168 fallback_nick = entity.name
169 msg["reply"]["to"] = entity.jid
170 else:
171 fallback_nick = None
173 if fallback := reply_to.body:
174 msg["reply"].add_quoted_fallback(fallback, fallback_nick)
176 @staticmethod
177 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None:
178 for preview in link_previews:
179 element = LinkPreviewStanza()
180 for i, name in enumerate(preview._fields):
181 val = preview[i]
182 if not val:
183 continue
184 element[name] = val
185 msg.append(element)