Coverage for slidge/core/mixins/message_maker.py: 92%

108 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import warnings 

2from datetime import datetime, timezone 

3from typing import TYPE_CHECKING, Iterable, Optional, cast 

4from uuid import uuid4 

5 

6from slixmpp import JID, Message 

7from slixmpp.types import MessageTypes 

8 

9from ...db.models import GatewayUser 

10from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza 

11from ...util.types import ( 

12 ChatState, 

13 LegacyMessageType, 

14 LinkPreview, 

15 MessageReference, 

16 ProcessingHint, 

17) 

18from .. import config 

19from .base import BaseSender 

20 

21if TYPE_CHECKING: 

22 from ...group import LegacyMUC, LegacyParticipant 

23 

24 

25class MessageMaker(BaseSender): 

26 mtype: MessageTypes = NotImplemented 

27 _can_send_carbon: bool = NotImplemented 

28 STRIP_SHORT_DELAY = False 

29 USE_STANZA_ID = False 

30 

31 muc: "LegacyMUC" 

32 is_group: bool 

33 

34 def _recipient_pk(self) -> int: 

35 return self.muc.stored.id if self.is_group else self.stored.id # type:ignore 

36 

37 def _make_message( 

38 self, 

39 state: Optional[ChatState] = None, 

40 hints: Iterable[ProcessingHint] = (), 

41 legacy_msg_id: Optional[LegacyMessageType] = None, 

42 when: Optional[datetime] = None, 

43 reply_to: Optional[MessageReference] = None, 

44 carbon: bool = False, 

45 link_previews: Optional[Iterable[LinkPreview]] = None, 

46 **kwargs, 

47 ): 

48 body = kwargs.pop("mbody", None) 

49 mfrom = kwargs.pop("mfrom", self.jid) 

50 mto = kwargs.pop("mto", None) 

51 thread = kwargs.pop("thread", None) 

52 if carbon and self._can_send_carbon: 

53 # the msg needs to have jabber:client as xmlns, so 

54 # we don't want to associate with the XML stream 

55 msg_cls = Message 

56 else: 

57 msg_cls = self.xmpp.Message # type:ignore 

58 msg = msg_cls( 

59 sfrom=mfrom, 

60 stype=kwargs.pop("mtype", None) or self.mtype, 

61 sto=mto, 

62 **kwargs, 

63 ) 

64 if body: 

65 msg["body"] = body 

66 state = "active" 

67 if thread: 

68 with self.xmpp.store.session() as orm: 

69 thread_str = str(thread) 

70 msg["thread"] = ( 

71 self.xmpp.store.id_map.get_thread( 

72 orm, self._recipient_pk(), thread_str, self.is_group 

73 ) 

74 or thread_str 

75 ) 

76 if state: 

77 msg["chat_state"] = state 

78 for hint in hints: 

79 msg.enable(hint) 

80 self._set_msg_id(msg, legacy_msg_id) 

81 self._add_delay(msg, when) 

82 if link_previews: 

83 self._add_link_previews(msg, link_previews) 

84 if reply_to: 

85 self._add_reply_to(msg, reply_to) 

86 return msg 

87 

88 def _set_msg_id( 

89 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None 

90 ) -> None: 

91 if legacy_msg_id is not None: 

92 i = self.session.legacy_to_xmpp_msg_id(legacy_msg_id) 

93 msg.set_id(i) 

94 if self.USE_STANZA_ID: 

95 msg["stanza_id"]["id"] = i 

96 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

97 elif self.USE_STANZA_ID: 

98 msg["stanza_id"]["id"] = str(uuid4()) 

99 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

100 

101 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType) -> str: 

102 with self.xmpp.store.session() as orm: 

103 ids = self.xmpp.store.id_map.get_xmpp( 

104 orm, self._recipient_pk(), str(legacy_id), False 

105 ) 

106 if ids: 

107 return ids[-1] 

108 return self.session.legacy_to_xmpp_msg_id(legacy_id) 

109 

110 def _add_delay(self, msg: Message, when: Optional[datetime]) -> None: 

111 if when: 

112 if when.tzinfo is None: 

113 when = when.astimezone(timezone.utc) 

114 if self.STRIP_SHORT_DELAY: 

115 delay = datetime.now().astimezone(timezone.utc) - when 

116 if delay < config.IGNORE_DELAY_THRESHOLD: 

117 return 

118 msg["delay"].set_stamp(when) 

119 msg["delay"].set_from(self.xmpp.boundjid.bare) 

120 

121 def _add_reply_to(self, msg: Message, reply_to: MessageReference) -> None: 

122 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id) 

123 msg["reply"]["id"] = xmpp_id 

124 

125 muc = getattr(self, "muc", None) 

126 

127 if entity := reply_to.author: 

128 if entity == "user" or isinstance(entity, GatewayUser): 

129 if isinstance(entity, GatewayUser): 

130 warnings.warn( 

131 "Using a GatewayUser as the author of a " 

132 "MessageReference is deprecated. Use the string 'user' " 

133 "instead.", 

134 DeprecationWarning, 

135 ) 

136 if muc: 

137 jid = JID(muc.jid) 

138 jid.resource = fallback_nick = muc.user_nick 

139 msg["reply"]["to"] = jid 

140 else: 

141 msg["reply"]["to"] = self.session.user_jid 

142 # TODO: here we should use preferably use the PEP nick of the user 

143 # (but it doesn't matter much) 

144 fallback_nick = self.session.user_jid.user 

145 else: 

146 if muc: 

147 if hasattr(entity, "muc"): 

148 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id() 

149 # a bit of work because right now this is a sync function 

150 entity = cast("LegacyParticipant", entity) 

151 fallback_nick = entity.nickname 

152 else: 

153 warnings.warn( 

154 "The author of a message reference in a MUC must be a" 

155 " Participant instance, not a Contact" 

156 ) 

157 fallback_nick = entity.name 

158 else: 

159 fallback_nick = entity.name 

160 msg["reply"]["to"] = entity.jid 

161 else: 

162 fallback_nick = None 

163 

164 if fallback := reply_to.body: 

165 msg["reply"].add_quoted_fallback(fallback, fallback_nick) 

166 

167 @staticmethod 

168 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]) -> None: 

169 for preview in link_previews: 

170 element = LinkPreviewStanza() 

171 for i, name in enumerate(preview._fields): 

172 val = preview[i] 

173 if not val: 

174 continue 

175 element[name] = val 

176 msg.append(element)