Coverage for slidge / core / mixins / message_maker.py: 92%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1import uuid 

2import warnings 

3from collections.abc import Iterable 

4from datetime import UTC, datetime 

5from typing import TYPE_CHECKING, cast 

6 

7from slixmpp import JID, Message 

8from slixmpp.types import MessageTypes 

9 

10from slidge.util import strip_illegal_chars 

11 

12from ...db.models import GatewayUser 

13from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza 

14from ...util.types import ( 

15 ChatState, 

16 LegacyMessageType, 

17 LinkPreview, 

18 MessageReference, 

19 ProcessingHint, 

20) 

21from .. import config 

22from .base import BaseSender 

23 

24if TYPE_CHECKING: 

25 from ...group import LegacyMUC, LegacyParticipant 

26 

27 

28class MessageMaker(BaseSender): 

29 mtype: MessageTypes = NotImplemented 

30 _can_send_carbon: bool = NotImplemented 

31 STRIP_SHORT_DELAY = False 

32 USE_STANZA_ID = False 

33 

34 muc: "LegacyMUC" 

35 

36 def _recipient_pk(self) -> int: 

37 return ( 

38 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore 

39 ) 

40 

41 def _make_message( 

42 self, 

43 state: ChatState | None = None, 

44 hints: Iterable[ProcessingHint] = (), 

45 legacy_msg_id: LegacyMessageType | None = None, 

46 when: datetime | None = None, 

47 reply_to: MessageReference | None = None, 

48 carbon: bool = False, 

49 link_previews: Iterable[LinkPreview] | None = None, 

50 **kwargs, 

51 ): 

52 body = kwargs.pop("mbody", None) 

53 mfrom = kwargs.pop("mfrom", self.jid) 

54 mto = kwargs.pop("mto", None) 

55 thread = kwargs.pop("thread", None) 

56 if carbon and self._can_send_carbon: 

57 # the msg needs to have jabber:client as xmlns, so 

58 # we don't want to associate with the XML stream 

59 msg_cls = Message 

60 else: 

61 msg_cls = self.xmpp.Message # type:ignore 

62 msg = msg_cls( 

63 sfrom=mfrom, 

64 stype=kwargs.pop("mtype", None) or self.mtype, 

65 sto=mto, 

66 **kwargs, 

67 ) 

68 if body: 

69 msg["body"] = strip_illegal_chars(body, "�") 

70 state = "active" 

71 if thread: 

72 with self.xmpp.store.session() as orm: 

73 thread_str = str(thread) 

74 msg["thread"] = ( 

75 self.xmpp.store.id_map.get_thread( 

76 orm, 

77 self._recipient_pk(), 

78 thread_str, 

79 self.is_participant, 

80 ) 

81 or thread_str 

82 ) 

83 if state: 

84 msg["chat_state"] = state 

85 for hint in hints: 

86 msg.enable(hint) 

87 self._set_msg_id(msg, legacy_msg_id) 

88 self._add_delay(msg, when) 

89 if link_previews: 

90 self._add_link_previews(msg, link_previews) 

91 if reply_to: 

92 self._add_reply_to(msg, reply_to) 

93 return msg 

94 

95 def _set_msg_id( 

96 self, msg: Message, legacy_msg_id: LegacyMessageType | None = None 

97 ) -> None: 

98 if legacy_msg_id is not None: 

99 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

100 msg.set_id(i) 

101 if self.USE_STANZA_ID: 

102 msg["stanza_id"]["id"] = i 

103 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

104 elif self.USE_STANZA_ID: 

105 msg["stanza_id"]["id"] = str(uuid.uuid4()) 

106 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

107 

108 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> list[str]: 

109 with self.xmpp.store.session() as orm: 

110 ids = self.xmpp.store.id_map.get_xmpp( 

111 orm, 

112 self._recipient_pk(), 

113 str(legacy_id), 

114 self.is_participant, 

115 ) 

116 if ids: 

117 return ids 

118 return [self.session.legacy_to_xmpp_msg_id(legacy_id)] 

119 

120 def _add_delay(self, msg: Message, when: datetime | None) -> None: 

121 if when: 

122 if when.tzinfo is None: 

123 when = when.astimezone(UTC) 

124 if self.STRIP_SHORT_DELAY: 

125 delay = (datetime.now().astimezone(UTC) - when).seconds 

126 if delay < config.IGNORE_DELAY_THRESHOLD: 

127 return 

128 msg["delay"].set_stamp(when) 

129 msg["delay"].set_from(self.xmpp.boundjid.bare) 

130 

131 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None: 

132 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0] 

133 msg["reply"]["id"] = xmpp_id 

134 

135 muc = getattr(self, "muc", None) 

136 

137 if entity := reply_to.author: 

138 if entity == "user" or isinstance(entity, GatewayUser): 

139 if isinstance(entity, GatewayUser): 

140 warnings.warn( 

141 "Using a GatewayUser as the author of a " 

142 "MessageReference is deprecated. Use the string 'user' " 

143 "instead.", 

144 DeprecationWarning, 

145 ) 

146 if muc: 

147 jid = JID(muc.jid) 

148 jid.resource = fallback_nick = muc.user_nick 

149 msg["reply"]["to"] = jid 

150 else: 

151 msg["reply"]["to"] = self.session.user_jid 

152 # TODO: here we should use preferably use the PEP nick of the user 

153 # (but it doesn't matter much) 

154 fallback_nick = self.session.user_jid.user 

155 else: 

156 if muc: 

157 if hasattr(entity, "muc"): 

158 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id() 

159 # a bit of work because right now this is a sync function 

160 entity = cast("LegacyParticipant", entity) 

161 fallback_nick = entity.nickname 

162 else: 

163 warnings.warn( 

164 "The author of a message reference in a MUC must be a" 

165 " Participant instance, not a Contact" 

166 ) 

167 fallback_nick = entity.name 

168 else: 

169 fallback_nick = entity.name 

170 msg["reply"]["to"] = entity.jid 

171 else: 

172 fallback_nick = None 

173 

174 if fallback := reply_to.body: 

175 msg["reply"].add_quoted_fallback(fallback, fallback_nick) 

176 

177 @staticmethod 

178 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None: 

179 for preview in link_previews: 

180 element = LinkPreviewStanza() 

181 for i, name in enumerate(preview._fields): 

182 val = preview[i] 

183 if not val: 

184 continue 

185 element[name] = val 

186 msg.append(element)