Coverage for slidge/core/mixins/message_maker.py: 92%
108 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import warnings
2from datetime import datetime, timezone
3from typing import TYPE_CHECKING, Iterable, Optional, cast
4from uuid import uuid4
6from slixmpp import JID, Message
7from slixmpp.types import MessageTypes
9from ...db.models import GatewayUser
10from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza
11from ...util.types import (
12 ChatState,
13 LegacyMessageType,
14 LinkPreview,
15 MessageReference,
16 ProcessingHint,
17)
18from .. import config
19from .base import BaseSender
21if TYPE_CHECKING:
22 from ...group import LegacyMUC, LegacyParticipant
25class MessageMaker(BaseSender):
26 mtype: MessageTypes = NotImplemented
27 _can_send_carbon: bool = NotImplemented
28 STRIP_SHORT_DELAY = False
29 USE_STANZA_ID = False
31 muc: "LegacyMUC"
32 is_group: bool
34 def _recipient_pk(self) -> int:
35 return self.muc.stored.id if self.is_group else self.stored.id # type:ignore
37 def _make_message(
38 self,
39 state: Optional[ChatState] = None,
40 hints: Iterable[ProcessingHint] = (),
41 legacy_msg_id: Optional[LegacyMessageType] = None,
42 when: Optional[datetime] = None,
43 reply_to: Optional[MessageReference] = None,
44 carbon: bool = False,
45 link_previews: Optional[Iterable[LinkPreview]] = None,
46 **kwargs,
47 ):
48 body = kwargs.pop("mbody", None)
49 mfrom = kwargs.pop("mfrom", self.jid)
50 mto = kwargs.pop("mto", None)
51 thread = kwargs.pop("thread", None)
52 if carbon and self._can_send_carbon:
53 # the msg needs to have jabber:client as xmlns, so
54 # we don't want to associate with the XML stream
55 msg_cls = Message
56 else:
57 msg_cls = self.xmpp.Message # type:ignore
58 msg = msg_cls(
59 sfrom=mfrom,
60 stype=kwargs.pop("mtype", None) or self.mtype,
61 sto=mto,
62 **kwargs,
63 )
64 if body:
65 msg["body"] = body
66 state = "active"
67 if thread:
68 with self.xmpp.store.session() as orm:
69 thread_str = str(thread)
70 msg["thread"] = (
71 self.xmpp.store.id_map.get_thread(
72 orm, self._recipient_pk(), thread_str, self.is_group
73 )
74 or thread_str
75 )
76 if state:
77 msg["chat_state"] = state
78 for hint in hints:
79 msg.enable(hint)
80 self._set_msg_id(msg, legacy_msg_id)
81 self._add_delay(msg, when)
82 if link_previews:
83 self._add_link_previews(msg, link_previews)
84 if reply_to:
85 self._add_reply_to(msg, reply_to)
86 return msg
88 def _set_msg_id(
89 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None
90 ) -> None:
91 if legacy_msg_id is not None:
92 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id)
93 msg.set_id(i)
94 if self.USE_STANZA_ID:
95 msg["stanza_id"]["id"] = i
96 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
97 elif self.USE_STANZA_ID:
98 msg["stanza_id"]["id"] = str(uuid4())
99 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
101 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> str:
102 with self.xmpp.store.session() as orm:
103 ids = self.xmpp.store.id_map.get_xmpp(
104 orm, self._recipient_pk(), str(legacy_id), False
105 )
106 if ids:
107 return ids[-1]
108 return self.session.legacy_to_xmpp_msg_id(legacy_id)
110 def _add_delay(self, msg: Message, when: Optional[datetime]) -> None:
111 if when:
112 if when.tzinfo is None:
113 when = when.astimezone(timezone.utc)
114 if self.STRIP_SHORT_DELAY:
115 delay = datetime.now().astimezone(timezone.utc) - when
116 if delay < config.IGNORE_DELAY_THRESHOLD:
117 return
118 msg["delay"].set_stamp(when)
119 msg["delay"].set_from(self.xmpp.boundjid.bare)
121 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None:
122 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)
123 msg["reply"]["id"] = xmpp_id
125 muc = getattr(self, "muc", None)
127 if entity := reply_to.author:
128 if entity == "user" or isinstance(entity, GatewayUser):
129 if isinstance(entity, GatewayUser):
130 warnings.warn(
131 "Using a GatewayUser as the author of a "
132 "MessageReference is deprecated. Use the string 'user' "
133 "instead.",
134 DeprecationWarning,
135 )
136 if muc:
137 jid = JID(muc.jid)
138 jid.resource = fallback_nick = muc.user_nick
139 msg["reply"]["to"] = jid
140 else:
141 msg["reply"]["to"] = self.session.user_jid
142 # TODO: here we should use preferably use the PEP nick of the user
143 # (but it doesn't matter much)
144 fallback_nick = self.session.user_jid.user
145 else:
146 if muc:
147 if hasattr(entity, "muc"):
148 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id()
149 # a bit of work because right now this is a sync function
150 entity = cast("LegacyParticipant", entity)
151 fallback_nick = entity.nickname
152 else:
153 warnings.warn(
154 "The author of a message reference in a MUC must be a"
155 " Participant instance, not a Contact"
156 )
157 fallback_nick = entity.name
158 else:
159 fallback_nick = entity.name
160 msg["reply"]["to"] = entity.jid
161 else:
162 fallback_nick = None
164 if fallback := reply_to.body:
165 msg["reply"].add_quoted_fallback(fallback, fallback_nick)
167 @staticmethod
168 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None:
169 for preview in link_previews:
170 element = LinkPreviewStanza()
171 for i, name in enumerate(preview._fields):
172 val = preview[i]
173 if not val:
174 continue
175 element[name] = val
176 msg.append(element)