Coverage for slidge/core/mixins/message_maker.py: 93%

99 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import warnings 

2from copy import copy 

3from datetime import datetime, timezone 

4from typing import TYPE_CHECKING, Iterable, Optional, cast 

5from uuid import uuid4 

6 

7from slixmpp import Message 

8from slixmpp.types import MessageTypes 

9 

10from ...db.models import GatewayUser 

11from ...slixfix.link_preview.stanza import LinkPreview as LinkPreviewStanza 

12from ...util.types import ( 

13 ChatState, 

14 LegacyMessageType, 

15 LinkPreview, 

16 MessageReference, 

17 ProcessingHint, 

18) 

19from .. import config 

20from .base import BaseSender 

21 

22if TYPE_CHECKING: 

23 from ...group.participant import LegacyParticipant 

24 

25 

26class MessageMaker(BaseSender): 

27 mtype: MessageTypes = NotImplemented 

28 _can_send_carbon: bool = NotImplemented 

29 STRIP_SHORT_DELAY = False 

30 USE_STANZA_ID = False 

31 

32 def _make_message( 

33 self, 

34 state: Optional[ChatState] = None, 

35 hints: Iterable[ProcessingHint] = (), 

36 legacy_msg_id: Optional[LegacyMessageType] = None, 

37 when: Optional[datetime] = None, 

38 reply_to: Optional[MessageReference] = None, 

39 carbon=False, 

40 link_previews: Optional[Iterable[LinkPreview]] = None, 

41 **kwargs, 

42 ): 

43 body = kwargs.pop("mbody", None) 

44 mfrom = kwargs.pop("mfrom", self.jid) 

45 mto = kwargs.pop("mto", None) 

46 thread = kwargs.pop("thread", None) 

47 if carbon and self._can_send_carbon: 

48 # the msg needs to have jabber:client as xmlns, so 

49 # we don't want to associate with the XML stream 

50 msg_cls = Message 

51 else: 

52 msg_cls = self.xmpp.Message # type:ignore 

53 msg = msg_cls( 

54 sfrom=mfrom, 

55 stype=kwargs.pop("mtype", None) or self.mtype, 

56 sto=mto, 

57 **kwargs, 

58 ) 

59 if body: 

60 msg["body"] = body 

61 state = "active" 

62 if thread: 

63 msg["thread"] = self.xmpp.store.sent.get_legacy_thread( 

64 self.user_pk, str(thread) 

65 ) or str(thread) 

66 if state: 

67 msg["chat_state"] = state 

68 for hint in hints: 

69 msg.enable(hint) 

70 self._set_msg_id(msg, legacy_msg_id) 

71 self._add_delay(msg, when) 

72 if link_previews: 

73 self._add_link_previews(msg, link_previews) 

74 if reply_to: 

75 self._add_reply_to(msg, reply_to) 

76 return msg 

77 

78 def _set_msg_id( 

79 self, msg: Message, legacy_msg_id: Optional[LegacyMessageType] = None 

80 ): 

81 if legacy_msg_id is not None: 

82 i = self._legacy_to_xmpp(legacy_msg_id) 

83 msg.set_id(i) 

84 if self.USE_STANZA_ID: 

85 msg["stanza_id"]["id"] = i 

86 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

87 elif self.USE_STANZA_ID: 

88 msg["stanza_id"]["id"] = str(uuid4()) 

89 msg["stanza_id"]["by"] = self.muc.jid # type: ignore 

90 

91 def _legacy_to_xmpp(self, legacy_id: LegacyMessageType): 

92 return self.xmpp.store.sent.get_xmpp_id( 

93 self.session.user_pk, str(legacy_id) 

94 ) or self.session.legacy_to_xmpp_msg_id(legacy_id) 

95 

96 def _add_delay(self, msg: Message, when: Optional[datetime]): 

97 if when: 

98 if when.tzinfo is None: 

99 when = when.astimezone(timezone.utc) 

100 if self.STRIP_SHORT_DELAY: 

101 delay = datetime.now().astimezone(timezone.utc) - when 

102 if delay < config.IGNORE_DELAY_THRESHOLD: 

103 return 

104 msg["delay"].set_stamp(when) 

105 msg["delay"].set_from(self.xmpp.boundjid.bare) 

106 

107 def _add_reply_to(self, msg: Message, reply_to: MessageReference): 

108 xmpp_id = self._legacy_to_xmpp(reply_to.legacy_id) 

109 msg["reply"]["id"] = xmpp_id 

110 

111 muc = getattr(self, "muc", None) 

112 

113 if entity := reply_to.author: 

114 if entity == "user" or isinstance(entity, GatewayUser): 

115 if isinstance(entity, GatewayUser): 

116 warnings.warn( 

117 "Using a GatewayUser as the author of a " 

118 "MessageReference is deprecated. Use the string 'user' " 

119 "instead.", 

120 DeprecationWarning, 

121 ) 

122 if muc: 

123 jid = copy(muc.jid) 

124 jid.resource = fallback_nick = muc.user_nick 

125 msg["reply"]["to"] = jid 

126 else: 

127 msg["reply"]["to"] = self.session.user_jid 

128 # TODO: here we should use preferably use the PEP nick of the user 

129 # (but it doesn't matter much) 

130 fallback_nick = self.session.user_jid.local 

131 else: 

132 if muc: 

133 if hasattr(entity, "muc"): 

134 # TODO: accept a Contact here and use muc.get_participant_by_legacy_id() 

135 # a bit of work because right now this is a sync function 

136 entity = cast("LegacyParticipant", entity) 

137 fallback_nick = entity.nickname 

138 else: 

139 warnings.warn( 

140 "The author of a message reference in a MUC must be a" 

141 " Participant instance, not a Contact" 

142 ) 

143 fallback_nick = entity.name 

144 else: 

145 fallback_nick = entity.name 

146 msg["reply"]["to"] = entity.jid 

147 else: 

148 fallback_nick = None 

149 

150 if fallback := reply_to.body: 

151 msg["reply"].add_quoted_fallback(fallback, fallback_nick) 

152 

153 @staticmethod 

154 def _add_link_previews(msg: Message, link_previews: Iterable[LinkPreview]): 

155 for preview in link_previews: 

156 element = LinkPreviewStanza() 

157 for i, name in enumerate(preview._fields): 

158 val = preview[i] 

159 if not val: 

160 continue 

161 element[name] = val 

162 msg.append(element)