Coverage for slidge / core / mixins / message_maker.py: 89%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import base64 

2import io 

3import logging 

4import uuid 

5import warnings 

6from collections.abc import Iterable 

7from datetime import UTC, datetime 

8from pathlib import Path 

9from typing import cast 

10 

11from PIL import Image 

12from slixmpp import Message 

13from slixmpp.plugins.xep_0511.stanza import LinkMetadata 

14from slixmpp.types import MessageTypes 

15 

16from slidge.util import strip_illegal_chars 

17 

18from ...db.models import GatewayUser 

19from ...util.types import ( 

20 AnyMUC, 

21 AnyParticipant, 

22 ChatState, 

23 LinkPreview, 

24 MessageReference, 

25 ProcessingHint, 

26) 

27from .. import config 

28from .base import BaseSender 

29 

30 

31class MessageMaker(BaseSender): 

32 mtype: MessageTypes = NotImplemented 

33 _can_send_carbon: bool = NotImplemented 

34 STRIP_SHORT_DELAY = False 

35 USE_STANZA_ID = False 

36 

37 muc: AnyMUC 

38 

39 def _recipient_pk(self) -> int: 

40 return ( 

41 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore 

42 ) 

43 

44 def _make_message( 

45 self, 

46 state: ChatState | None = None, 

47 hints: Iterable[ProcessingHint] = (), 

48 legacy_msg_id: str | None = None, 

49 when: datetime | None = None, 

50 reply_to: MessageReference | None = None, 

51 carbon: bool = False, 

52 link_previews: Iterable[LinkPreview] | None = None, 

53 **kwargs: object, 

54 ) -> Message: 

55 body = kwargs.pop("mbody", None) 

56 mfrom = kwargs.pop("mfrom", self.jid) 

57 mto = kwargs.pop("mto", None) 

58 thread = kwargs.pop("thread", None) 

59 # the msg needs to have jabber:client as xmlns, so 

60 # we don't want to associate with the XML stream 

61 msg_cls = Message if carbon and self._can_send_carbon else self.xmpp.Message 

62 msg = msg_cls( 

63 sfrom=mfrom, 

64 stype=kwargs.pop("mtype", None) or self.mtype, 

65 sto=mto, 

66 **kwargs, 

67 ) 

68 if body: 

69 assert isinstance(body, str) 

70 msg["body"] = strip_illegal_chars(body, "�") 

71 state = "active" 

72 if thread: 

73 with self.xmpp.store.session() as orm: 

74 thread_str = str(thread) 

75 msg["thread"] = ( 

76 self.xmpp.store.id_map.get_thread( 

77 orm, 

78 self._recipient_pk(), 

79 thread_str, 

80 self.is_participant, 

81 ) 

82 or thread_str 

83 ) 

84 if state: 

85 msg["chat_state"] = state 

86 for hint in hints: 

87 msg.enable(hint) 

88 self._set_msg_id(msg, legacy_msg_id) 

89 self._add_delay(msg, when) 

90 if link_previews: 

91 self._add_link_previews(msg, link_previews) 

92 if reply_to: 

93 self._add_reply_to(msg, reply_to) 

94 return msg 

95 

96 def _set_msg_id(self, msg: Message, legacy_msg_id: str | None = None) -> None: 

97 if legacy_msg_id is not None: 

98 msg.set_id(legacy_msg_id) 

99 if self.USE_STANZA_ID: 

100 msg["stanza_id"]["id"] = legacy_msg_id 

101 msg["stanza_id"]["by"] = self.muc.jid 

102 elif self.USE_STANZA_ID: 

103 msg["stanza_id"]["id"] = str(uuid.uuid4()) 

104 msg["stanza_id"]["by"] = self.muc.jid 

105 

106 def _legacy_to_xmpp(self, legacy_id: str) -> list[str]: 

107 with self.xmpp.store.session() as orm: 

108 ids = self.xmpp.store.id_map.get_xmpp( 

109 orm, 

110 self._recipient_pk(), 

111 str(legacy_id), 

112 self.is_participant, 

113 ) 

114 if ids: 

115 return ids 

116 return [legacy_id] 

117 

118 def _add_delay(self, msg: Message, when: datetime | None) -> None: 

119 if when: 

120 if when.tzinfo is None: 

121 when = when.astimezone(UTC) 

122 if self.STRIP_SHORT_DELAY: 

123 delay = (datetime.now().astimezone(UTC) - when).seconds 

124 if delay < config.IGNORE_DELAY_THRESHOLD: 

125 return 

126 msg["delay"].set_stamp(when) 

127 msg["delay"].set_from(self.xmpp.boundjid.bare) 

128 

129 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None: 

130 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0] 

131 msg["reply"]["id"] = xmpp_id 

132 

133 muc = getattr(self, "muc", None) 

134 

135 if entity := reply_to.author: 

136 if entity == "user" or isinstance(entity, GatewayUser): 

137 if isinstance(entity, GatewayUser): 

138 warnings.warn( 

139 "Using a GatewayUser as the author of a " 

140 "MessageReference is deprecated. Use the string 'user' " 

141 "instead.", 

142 DeprecationWarning, 

143 ) 

144 if muc: 

145 msg["reply"]["to"] = muc.user_muc_jid 

146 fallback_nick = muc.user_nick 

147 else: 

148 msg["reply"]["to"] = self.session.user_jid 

149 # TODO: here we should use preferably use the PEP nick of the user 

150 # (but it doesn't matter much) 

151 fallback_nick = self.session.user_jid.user 

152 else: 

153 if muc: 

154 if hasattr(entity, "muc"): 

155 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id() 

156 # a bit of work because right now this is a sync function 

157 entity = cast(AnyParticipant, entity) 

158 fallback_nick = entity.nickname 

159 else: 

160 warnings.warn( 

161 "The author of a message reference in a MUC must be a" 

162 " Participant instance, not a Contact" 

163 ) 

164 fallback_nick = entity.name 

165 else: 

166 fallback_nick = entity.name 

167 msg["reply"]["to"] = entity.jid 

168 else: 

169 fallback_nick = None 

170 

171 if fallback := reply_to.body: 

172 msg["reply"].add_quoted_fallback(fallback, fallback_nick) 

173 

174 def _add_link_previews( 

175 self, msg: Message, link_previews: Iterable[LinkPreview] 

176 ) -> None: 

177 for preview in link_previews: 

178 if preview.is_empty: 

179 continue 

180 element = LinkMetadata() 

181 for i, name in enumerate(preview._fields): 

182 val = preview[i] 

183 if isinstance(val, Path): 

184 val = val.read_bytes() 

185 if isinstance(val, bytes): 

186 val = self._process_link_preview_image(val) 

187 if not val: 

188 continue 

189 element[name] = val 

190 msg.append(element) 

191 

192 @staticmethod 

193 def _process_link_preview_image(data: bytes) -> str | None: 

194 # this will block the main thread. if this proves to be an issue in practice, 

195 # this could be rewritten to use the thread pool we use to resize avatars. 

196 try: 

197 image = Image.open(io.BytesIO(data)) 

198 except Exception: 

199 log.exception("Skipping link preview image") 

200 return None 

201 

202 rewrite = False 

203 if image.format != "JPEG": 

204 rewrite = True 

205 

206 if any(x > MAX_LINK_PREVIEW_IMAGE_SIZE for x in image.size): 

207 image.thumbnail((MAX_LINK_PREVIEW_IMAGE_SIZE, MAX_LINK_PREVIEW_IMAGE_SIZE)) 

208 rewrite = True 

209 

210 if rewrite: 

211 with io.BytesIO() as f: 

212 image.save(f, format="JPEG") 

213 data = f.getvalue() 

214 

215 return "data:image/jpeg;base64," + base64.b64encode(data).decode("utf-8") 

216 

217 

218# Instead of having a hardcoded value for this, we would ideally use 

219# XEP-0478: Stream Limits Advertisement to know which size is authorized. 

220# However, this isn't possible until XEP-0225: Component Connections is a thing. 

221# Prosody defaults to 512kb for s2s connection and 10Mb for c2s connections. 

222# Some quick tests about JPEG image: 

223# median size of 50 base64-encoded JPEG random RGB image 

224# 128x128 pixels: 14kb ± 0.04 

225# 256x256 pixels: 53kb ± 0.06 # sounds like a good tradeoff 

226# 384x384 pixels: 119kb ± 0.11 

227# 512x512 pixels: 211kb ± 0.13 

228# 640x640 pixels: 329kb ± 0.15 

229# 768x768 pixels: 473kb ± 0.21 

230# 896x896 pixels: 644kb ± 0.27 

231# 1024x1024 pixels: 841kb ± 0.22 

232MAX_LINK_PREVIEW_IMAGE_SIZE = 256 

233 

234log = logging.getLogger(__name__)