Coverage for slidge/core/mixins/message_maker.py: 93%
99 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import warnings
2from copy import copy
3from datetime import datetime, timezone
4from typing import TYPE_CHECKING, Iterable, Optional, cast
5from uuid import uuid4
7from slixmpp import Message
8from slixmpp.types import MessageTypes
10from ...db.models import GatewayUser
11from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza
12from ...util.types import (
13 ChatState,
14 LegacyMessageType,
15 LinkPreview,
16 MessageReference,
17 ProcessingHint,
18)
19from .. import config
20from .base import BaseSender
22if TYPE_CHECKING:
23 from ...group.participant import LegacyParticipant
26class MessageMaker(BaseSender):
27 mtype: MessageTypes = NotImplemented
28 _can_send_carbon: bool = NotImplemented
29 STRIP_SHORT_DELAY = False
30 USE_STANZA_ID = False
32 def _make_message(
33 self,
34 state: Optional[ChatState] = None,
35 hints: Iterable[ProcessingHint] = (),
36 legacy_msg_id: Optional[LegacyMessageType] = None,
37 when: Optional[datetime] = None,
38 reply_to: Optional[MessageReference] = None,
39 carbon=False,
40 link_previews: Optional[Iterable[LinkPreview]] = None,
41 **kwargs,
42 ):
43 body = kwargs.pop("mbody", None)
44 mfrom = kwargs.pop("mfrom", self.jid)
45 mto = kwargs.pop("mto", None)
46 thread = kwargs.pop("thread", None)
47 if carbon and self._can_send_carbon:
48 # the msg needs to have jabber:client as xmlns, so
49 # we don't want to associate with the XML stream
50 msg_cls = Message
51 else:
52 msg_cls = self.xmpp.Message # type:ignore
53 msg = msg_cls(
54 sfrom=mfrom,
55 stype=kwargs.pop("mtype", None) or self.mtype,
56 sto=mto,
57 **kwargs,
58 )
59 if body:
60 msg["body"] = body
61 state = "active"
62 if thread:
63 msg["thread"] = self.xmpp.store.sent.get_legacy_thread(
64 self.user_pk, str(thread)
65 ) or str(thread)
66 if state:
67 msg["chat_state"] = state
68 for hint in hints:
69 msg.enable(hint)
70 self._set_msg_id(msg, legacy_msg_id)
71 self._add_delay(msg, when)
72 if link_previews:
73 self._add_link_previews(msg, link_previews)
74 if reply_to:
75 self._add_reply_to(msg, reply_to)
76 return msg
78 def _set_msg_id(
79 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None
80 ):
81 if legacy_msg_id is not None:
82 i = self._legacy_to_xmpp(legacy_msg_id)
83 msg.set_id(i)
84 if self.USE_STANZA_ID:
85 msg["stanza_id"]["id"] = i
86 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
87 elif self.USE_STANZA_ID:
88 msg["stanza_id"]["id"] = str(uuid4())
89 msg["stanza_id"]["by"] = self.muc.jid # type: ignore
91 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType):
92 return self.xmpp.store.sent.get_xmpp_id(
93 self.session.user_pk, str(legacy_id)
94 ) or self.session.legacy_to_xmpp_msg_id(legacy_id)
96 def _add_delay(self, msg: Message, when: Optional[datetime]):
97 if when:
98 if when.tzinfo is None:
99 when = when.astimezone(timezone.utc)
100 if self.STRIP_SHORT_DELAY:
101 delay = datetime.now().astimezone(timezone.utc) - when
102 if delay < config.IGNORE_DELAY_THRESHOLD:
103 return
104 msg["delay"].set_stamp(when)
105 msg["delay"].set_from(self.xmpp.boundjid.bare)
107 def _add_reply_to(self, msg: Message, reply_to: MessageReference):
108 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)
109 msg["reply"]["id"] = xmpp_id
111 muc = getattr(self, "muc", None)
113 if entity := reply_to.author:
114 if entity == "user" or isinstance(entity, GatewayUser):
115 if isinstance(entity, GatewayUser):
116 warnings.warn(
117 "Using a GatewayUser as the author of a "
118 "MessageReference is deprecated. Use the string 'user' "
119 "instead.",
120 DeprecationWarning,
121 )
122 if muc:
123 jid = copy(muc.jid)
124 jid.resource = fallback_nick = muc.user_nick
125 msg["reply"]["to"] = jid
126 else:
127 msg["reply"]["to"] = self.session.user_jid
128 # TODO: here we should use preferably use the PEP nick of the user
129 # (but it doesn't matter much)
130 fallback_nick = self.session.user_jid.local
131 else:
132 if muc:
133 if hasattr(entity, "muc"):
134 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id()
135 # a bit of work because right now this is a sync function
136 entity = cast("LegacyParticipant", entity)
137 fallback_nick = entity.nickname
138 else:
139 warnings.warn(
140 "The author of a message reference in a MUC must be a"
141 " Participant instance, not a Contact"
142 )
143 fallback_nick = entity.name
144 else:
145 fallback_nick = entity.name
146 msg["reply"]["to"] = entity.jid
147 else:
148 fallback_nick = None
150 if fallback := reply_to.body:
151 msg["reply"].add_quoted_fallback(fallback, fallback_nick)
153 @staticmethod
154 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]):
155 for preview in link_previews:
156 element = LinkPreviewStanza()
157 for i, name in enumerate(preview._fields):
158 val = preview[i]
159 if not val:
160 continue
161 element[name] = val
162 msg.append(element)