Coverage for slidge/core/mixins/message_maker.py: 92%

107 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import uuid 

2import warnings 

3from datetime import datetime, timezone 

4from typing import TYPE_CHECKING, Iterable, Optional, cast 

5 

6from slixmpp import JID, Message 

7from slixmpp.types import MessageTypes 

8 

9from ...db.models import GatewayUser 

10from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza 

11from ...util.types import ( 

12 ChatState, 

13 LegacyMessageType, 

14 LinkPreview, 

15 MessageReference, 

16 ProcessingHint, 

17) 

18from .. import config 

19from .base import BaseSender 

20 

21if TYPE_CHECKING: 

22 from ...group import LegacyMUC, LegacyParticipant 

23 

24 

25class MessageMaker(BaseSender): 

26 mtype: MessageTypes = NotImplemented 

27 _can_send_carbon: bool = NotImplemented 

28 STRIP_SHORT_DELAY = False 

29 USE_STANZA_ID = False 

30 

31 muc: "LegacyMUC" 

32 

33 def _recipient_pk(self) -> int: 

34 return ( 

35 self.muc.stored.id if self.is_participant else self.stored.id # type:ignore 

36 ) 

37 

38 def _make_message( 

39 self, 

40 state: Optional[ChatState] = None, 

41 hints: Iterable[ProcessingHint] = (), 

42 legacy_msg_id: Optional[LegacyMessageType] = None, 

43 when: Optional[datetime] = None, 

44 reply_to: Optional[MessageReference] = None, 

45 carbon: bool = False, 

46 link_previews: Optional[Iterable[LinkPreview]] = None, 

47 **kwargs, 

48 ): 

49 body = kwargs.pop("mbody", None) 

50 mfrom = kwargs.pop("mfrom", self.jid) 

51 mto = kwargs.pop("mto", None) 

52 thread = kwargs.pop("thread", None) 

53 if carbon and self._can_send_carbon: 

54 # the msg needs to have jabber:client as xmlns, so 

55 # we don't want to associate with the XML stream 

56 msg_cls = Message 

57 else: 

58 msg_cls = self.xmpp.Message # type:ignore 

59 msg = msg_cls( 

60 sfrom=mfrom, 

61 stype=kwargs.pop("mtype", None) or self.mtype, 

62 sto=mto, 

63 **kwargs, 

64 ) 

65 if body: 

66 msg["body"] = body 

67 state = "active" 

68 if thread: 

69 with self.xmpp.store.session() as orm: 

70 thread_str = str(thread) 

71 msg["thread"] = ( 

72 self.xmpp.store.id_map.get_thread( 

73 orm, 

74 self._recipient_pk(), 

75 thread_str, 

76 self.is_participant, 

77 ) 

78 or thread_str 

79 ) 

80 if state: 

81 msg["chat_state"] = state 

82 for hint in hints: 

83 msg.enable(hint) 

84 self._set_msg_id(msg, legacy_msg_id) 

85 self._add_delay(msg, when) 

86 if link_previews: 

87 self._add_link_previews(msg, link_previews) 

88 if reply_to: 

89 self._add_reply_to(msg, reply_to) 

90 return msg 

91 

92 def _set_msg_id( 

93 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None 

94 ) -> None: 

95 if legacy_msg_id is not None: 

96 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

97 msg.set_id(i) 

98 if self.USE_STANZA_ID: 

99 msg["stanza_id"]["id"] = i 

100 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

101 elif self.USE_STANZA_ID: 

102 msg["stanza_id"]["id"] = str(uuid.uuid4()) 

103 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

104 

105 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> list[str]: 

106 with self.xmpp.store.session() as orm: 

107 ids = self.xmpp.store.id_map.get_xmpp( 

108 orm, 

109 self._recipient_pk(), 

110 str(legacy_id), 

111 self.is_participant, 

112 ) 

113 if ids: 

114 return ids 

115 return [self.session.legacy_to_xmpp_msg_id(legacy_id)] 

116 

117 def _add_delay(self, msg: Message, when: Optional[datetime]) -> None: 

118 if when: 

119 if when.tzinfo is None: 

120 when = when.astimezone(timezone.utc) 

121 if self.STRIP_SHORT_DELAY: 

122 delay = (datetime.now().astimezone(timezone.utc) - when).seconds 

123 if delay < config.IGNORE_DELAY_THRESHOLD: 

124 return 

125 msg["delay"].set_stamp(when) 

126 msg["delay"].set_from(self.xmpp.boundjid.bare) 

127 

128 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None: 

129 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id)[0] 

130 msg["reply"]["id"] = xmpp_id 

131 

132 muc = getattr(self, "muc", None) 

133 

134 if entity := reply_to.author: 

135 if entity == "user" or isinstance(entity, GatewayUser): 

136 if isinstance(entity, GatewayUser): 

137 warnings.warn( 

138 "Using a GatewayUser as the author of a " 

139 "MessageReference is deprecated. Use the string 'user' " 

140 "instead.", 

141 DeprecationWarning, 

142 ) 

143 if muc: 

144 jid = JID(muc.jid) 

145 jid.resource = fallback_nick = muc.user_nick 

146 msg["reply"]["to"] = jid 

147 else: 

148 msg["reply"]["to"] = self.session.user_jid 

149 # TODO: here we should use preferably use the PEP nick of the user 

150 # (but it doesn't matter much) 

151 fallback_nick = self.session.user_jid.user 

152 else: 

153 if muc: 

154 if hasattr(entity, "muc"): 

155 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id() 

156 # a bit of work because right now this is a sync function 

157 entity = cast("LegacyParticipant", entity) 

158 fallback_nick = entity.nickname 

159 else: 

160 warnings.warn( 

161 "The author of a message reference in a MUC must be a" 

162 " Participant instance, not a Contact" 

163 ) 

164 fallback_nick = entity.name 

165 else: 

166 fallback_nick = entity.name 

167 msg["reply"]["to"] = entity.jid 

168 else: 

169 fallback_nick = None 

170 

171 if fallback := reply_to.body: 

172 msg["reply"].add_quoted_fallback(fallback, fallback_nick) 

173 

174 @staticmethod 

175 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None: 

176 for preview in link_previews: 

177 element = LinkPreviewStanza() 

178 for i, name in enumerate(preview._fields): 

179 val = preview[i] 

180 if not val: 

181 continue 

182 element[name] = val 

183 msg.append(element)