Coverage for slidge/core/mixins/message_maker.py: 92%
107 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import uuid
2import warnings
3from datetime import datetime, timezone
4from typing import TYPE_CHECKING, Iterable, Optional, cast
6from slixmpp import JID, Message
7from slixmpp.types import MessageTypes
9from ...db.models import GatewayUser
10from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza
11from ...util.types import (
12 ChatState,
13 LegacyMessageType,
14 LinkPreview,
15 MessageReference,
16 ProcessingHint,
17)
18from .. import config
19from .base import BaseSender
21if TYPE_CHECKING:
22 from ...group import LegacyMUC, LegacyParticipant
25class MessageMaker(BaseSender):
26 mtype: MessageTypes = NotImplemented
27 _can_send_carbon: bool = NotImplemented
28 STRIP_SHORT_DELAY = False
29 USE_STANZA_ID = False
31 muc: "LegacyMUC"
33 def _recipient_pk(self) -> int:
34 return (
35 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore
36 )
38 def _make_message(
39 self,
40 state: Optional[ChatState] = None,
41 hints: Iterable[ProcessingHint] = (),
42 legacy_msg_id: Optional[LegacyMessageType] = None,
43 when: Optional[datetime] = None,
44 reply_to: Optional[MessageReference] = None,
45 carbon: bool = False,
46 link_previews: Optional[Iterable[LinkPreview]] = None,
47 **kwargs,
48 ):
49 body = kwargs.pop("mbody", None)
50 mfrom = kwargs.pop("mfrom", self.jid)
51 mto = kwargs.pop("mto", None)
52 thread = kwargs.pop("thread", None)
53 if carbon and self._can_send_carbon:
54 # the msg needs to have jabber:client as xmlns, so
55 # we don't want to associate with the XML stream
56 msg_cls = Message
57 else:
58 msg_cls = self.xmpp.Message # type:ignore
59 msg = msg_cls(
60 sfrom=mfrom,
61 stype=kwargs.pop("mtype", None) or self.mtype,
62 sto=mto,
63 **kwargs,
64 )
65 if body:
66 msg["body"] = body
67 state = "active"
68 if thread:
69 with self.xmpp.store.session() as orm:
70 thread_str = str(thread)
71 msg["thread"] = (
72 self.xmpp.store.id_map.get_thread(
73 orm,
74 self._recipient_pk(),
75 thread_str,
76 self.is_participant,
77 )
78 or thread_str
79 )
80 if state:
81 msg["chat_state"] = state
82 for hint in hints:
83 msg.enable(hint)
84 self._set_msg_id(msg, legacy_msg_id)
85 self._add_delay(msg, when)
86 if link_previews:
87 self._add_link_previews(msg, link_previews)
88 if reply_to:
89 self._add_reply_to(msg, reply_to)
90 return msg
92 def _set_msg_id(
93 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None
94 ) -> None:
95 if legacy_msg_id is not None:
96 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id)
97 msg.set_id(i)
98 if self.USE_STANZA_ID:
99 msg["stanza_id"]["id"] = i
100 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
101 elif self.USE_STANZA_ID:
102 msg["stanza_id"]["id"] = str(uuid.uuid4())
103 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
105 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> list[str]:
106 with self.xmpp.store.session() as orm:
107 ids = self.xmpp.store.id_map.get_xmpp(
108 orm,
109 self._recipient_pk(),
110 str(legacy_id),
111 self.is_participant,
112 )
113 if ids:
114 return ids
115 return [self.session.legacy_to_xmpp_msg_id(legacy_id)]
117 def _add_delay(self, msg: Message, when: Optional[datetime]) -> None:
118 if when:
119 if when.tzinfo is None:
120 when = when.astimezone(timezone.utc)
121 if self.STRIP_SHORT_DELAY:
122 delay = (datetime.now().astimezone(timezone.utc) - when).seconds
123 if delay < config.IGNORE_DELAY_THRESHOLD:
124 return
125 msg["delay"].set_stamp(when)
126 msg["delay"].set_from(self.xmpp.boundjid.bare)
128 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None:
129 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0]
130 msg["reply"]["id"] = xmpp_id
132 muc = getattr(self, "muc", None)
134 if entity := reply_to.author:
135 if entity == "user" or isinstance(entity, GatewayUser):
136 if isinstance(entity, GatewayUser):
137 warnings.warn(
138 "Using a GatewayUser as the author of a "
139 "MessageReference is deprecated. Use the string 'user' "
140 "instead.",
141 DeprecationWarning,
142 )
143 if muc:
144 jid = JID(muc.jid)
145 jid.resource = fallback_nick = muc.user_nick
146 msg["reply"]["to"] = jid
147 else:
148 msg["reply"]["to"] = self.session.user_jid
149 # TODO: here we should use preferably use the PEP nick of the user
150 # (but it doesn't matter much)
151 fallback_nick = self.session.user_jid.user
152 else:
153 if muc:
154 if hasattr(entity, "muc"):
155 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id()
156 # a bit of work because right now this is a sync function
157 entity = cast("LegacyParticipant", entity)
158 fallback_nick = entity.nickname
159 else:
160 warnings.warn(
161 "The author of a message reference in a MUC must be a"
162 " Participant instance, not a Contact"
163 )
164 fallback_nick = entity.name
165 else:
166 fallback_nick = entity.name
167 msg["reply"]["to"] = entity.jid
168 else:
169 fallback_nick = None
171 if fallback := reply_to.body:
172 msg["reply"].add_quoted_fallback(fallback, fallback_nick)
174 @staticmethod
175 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None:
176 for preview in link_previews:
177 element = LinkPreviewStanza()
178 for i, name in enumerate(preview._fields):
179 val = preview[i]
180 if not val:
181 continue
182 element[name] = val
183 msg.append(element)