Coverage for slidge / core / mixins / message_maker.py: 92%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import uuid 

2import warnings 

3from datetime import datetime, timezone 

4from typing import TYPE_CHECKING, Iterable, Optional, cast 

5 

6from slixmpp import JID, Message 

7from slixmpp.types import MessageTypes 

8 

9from slidge.util import strip_illegal_chars 

10 

11from ...db.models import GatewayUser 

12from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza 

13from ...util.types import ( 

14 ChatState, 

15 LegacyMessageType, 

16 LinkPreview, 

17 MessageReference, 

18 ProcessingHint, 

19) 

20from .. import config 

21from .base import BaseSender 

22 

23if TYPE_CHECKING: 

24 from ...group import LegacyMUC, LegacyParticipant 

25 

26 

27class MessageMaker(BaseSender): 

28 mtype: MessageTypes = NotImplemented 

29 _can_send_carbon: bool = NotImplemented 

30 STRIP_SHORT_DELAY = False 

31 USE_STANZA_ID = False 

32 

33 muc: "LegacyMUC" 

34 

35 def _recipient_pk(self) -> int: 

36 return ( 

37 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore 

38 ) 

39 

40 def _make_message( 

41 self, 

42 state: Optional[ChatState] = None, 

43 hints: Iterable[ProcessingHint] = (), 

44 legacy_msg_id: Optional[LegacyMessageType] = None, 

45 when: Optional[datetime] = None, 

46 reply_to: Optional[MessageReference] = None, 

47 carbon: bool = False, 

48 link_previews: Optional[Iterable[LinkPreview]] = None, 

49 **kwargs, 

50 ): 

51 body = kwargs.pop("mbody", None) 

52 mfrom = kwargs.pop("mfrom", self.jid) 

53 mto = kwargs.pop("mto", None) 

54 thread = kwargs.pop("thread", None) 

55 if carbon and self._can_send_carbon: 

56 # the msg needs to have jabber:client as xmlns, so 

57 # we don't want to associate with the XML stream 

58 msg_cls = Message 

59 else: 

60 msg_cls = self.xmpp.Message # type:ignore 

61 msg = msg_cls( 

62 sfrom=mfrom, 

63 stype=kwargs.pop("mtype", None) or self.mtype, 

64 sto=mto, 

65 **kwargs, 

66 ) 

67 if body: 

68 msg["body"] = strip_illegal_chars(body, "�") 

69 state = "active" 

70 if thread: 

71 with self.xmpp.store.session() as orm: 

72 thread_str = str(thread) 

73 msg["thread"] = ( 

74 self.xmpp.store.id_map.get_thread( 

75 orm, 

76 self._recipient_pk(), 

77 thread_str, 

78 self.is_participant, 

79 ) 

80 or thread_str 

81 ) 

82 if state: 

83 msg["chat_state"] = state 

84 for hint in hints: 

85 msg.enable(hint) 

86 self._set_msg_id(msg, legacy_msg_id) 

87 self._add_delay(msg, when) 

88 if link_previews: 

89 self._add_link_previews(msg, link_previews) 

90 if reply_to: 

91 self._add_reply_to(msg, reply_to) 

92 return msg 

93 

94 def _set_msg_id( 

95 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None 

96 ) -> None: 

97 if legacy_msg_id is not None: 

98 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

99 msg.set_id(i) 

100 if self.USE_STANZA_ID: 

101 msg["stanza_id"]["id"] = i 

102 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

103 elif self.USE_STANZA_ID: 

104 msg["stanza_id"]["id"] = str(uuid.uuid4()) 

105 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

106 

107 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> list[str]: 

108 with self.xmpp.store.session() as orm: 

109 ids = self.xmpp.store.id_map.get_xmpp( 

110 orm, 

111 self._recipient_pk(), 

112 str(legacy_id), 

113 self.is_participant, 

114 ) 

115 if ids: 

116 return ids 

117 return [self.session.legacy_to_xmpp_msg_id(legacy_id)] 

118 

119 def _add_delay(self, msg: Message, when: Optional[datetime]) -> None: 

120 if when: 

121 if when.tzinfo is None: 

122 when = when.astimezone(timezone.utc) 

123 if self.STRIP_SHORT_DELAY: 

124 delay = (datetime.now().astimezone(timezone.utc) - when).seconds 

125 if delay < config.IGNORE_DELAY_THRESHOLD: 

126 return 

127 msg["delay"].set_stamp(when) 

128 msg["delay"].set_from(self.xmpp.boundjid.bare) 

129 

130 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None: 

131 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0] 

132 msg["reply"]["id"] = xmpp_id 

133 

134 muc = getattr(self, "muc", None) 

135 

136 if entity := reply_to.author: 

137 if entity == "user" or isinstance(entity, GatewayUser): 

138 if isinstance(entity, GatewayUser): 

139 warnings.warn( 

140 "Using a GatewayUser as the author of a " 

141 "MessageReference is deprecated. Use the string 'user' " 

142 "instead.", 

143 DeprecationWarning, 

144 ) 

145 if muc: 

146 jid = JID(muc.jid) 

147 jid.resource = fallback_nick = muc.user_nick 

148 msg["reply"]["to"] = jid 

149 else: 

150 msg["reply"]["to"] = self.session.user_jid 

151 # TODO: here we should use preferably use the PEP nick of the user 

152 # (but it doesn't matter much) 

153 fallback_nick = self.session.user_jid.user 

154 else: 

155 if muc: 

156 if hasattr(entity, "muc"): 

157 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id() 

158 # a bit of work because right now this is a sync function 

159 entity = cast("LegacyParticipant", entity) 

160 fallback_nick = entity.nickname 

161 else: 

162 warnings.warn( 

163 "The author of a message reference in a MUC must be a" 

164 " Participant instance, not a Contact" 

165 ) 

166 fallback_nick = entity.name 

167 else: 

168 fallback_nick = entity.name 

169 msg["reply"]["to"] = entity.jid 

170 else: 

171 fallback_nick = None 

172 

173 if fallback := reply_to.body: 

174 msg["reply"].add_quoted_fallback(fallback, fallback_nick) 

175 

176 @staticmethod 

177 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None: 

178 for preview in link_previews: 

179 element = LinkPreviewStanza() 

180 for i, name in enumerate(preview._fields): 

181 val = preview[i] 

182 if not val: 

183 continue 

184 element[name] = val 

185 msg.append(element)