Coverage for slidge/group/archive.py: 98%

87 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import logging 

2import uuid 

3import warnings 

4from copy import copy 

5from datetime import datetime, timezone 

6from typing import TYPE_CHECKING, Collection, Optional 

7 

8from slixmpp import Iq, Message 

9 

10from ..db.models import ArchivedMessage, ArchivedMessageSource, Room 

11from ..db.store import MAMStore 

12from ..util.archive_msg import HistoryMessage 

13from ..util.types import HoleBound 

14 

15if TYPE_CHECKING: 

16 from .participant import LegacyParticipant 

17 

18 

19class MessageArchive: 

20 def __init__(self, room: Room, store: MAMStore) -> None: 

21 self.room = room 

22 self.__store = store 

23 

24 def add( 

25 self, 

26 msg: Message, 

27 participant: Optional["LegacyParticipant"] = None, 

28 archive_only: bool = False, 

29 legacy_msg_id=None, 

30 ) -> None: 

31 """ 

32 Add a message to the archive if it is deemed archivable 

33 

34 :param msg: 

35 :param participant: 

36 :param archive_only: 

37 :param legacy_msg_id: 

38 """ 

39 if not archivable(msg): 

40 return 

41 new_msg = copy(msg) 

42 if participant and not participant.muc.is_anonymous: 

43 new_msg["muc"]["role"] = participant.role or "participant" 

44 new_msg["muc"]["affiliation"] = participant.affiliation or "member" 

45 if participant.contact: 

46 new_msg["muc"]["jid"] = participant.contact.jid.bare 

47 elif participant.is_user: 

48 new_msg["muc"]["jid"] = participant.user_jid.bare 

49 elif participant.is_system: 

50 new_msg["muc"]["jid"] = participant.muc.jid 

51 else: 

52 warnings.warn( 

53 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'" 

54 ) 

55 new_msg["muc"]["jid"] = ( 

56 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}" 

57 ) 

58 

59 with self.__store.session() as orm: 

60 self.__store.add_message( 

61 orm, 

62 self.room.id, 

63 HistoryMessage(new_msg), 

64 archive_only, 

65 None if legacy_msg_id is None else str(legacy_msg_id), 

66 ) 

67 orm.commit() 

68 

69 def __iter__(self): 

70 return iter(self.get_all()) 

71 

72 @staticmethod 

73 def __to_bound(stored: ArchivedMessage): 

74 return HoleBound( 

75 stored.legacy_id, # type:ignore 

76 stored.timestamp.replace(tzinfo=timezone.utc), 

77 ) 

78 

79 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]: 

80 with self.__store.session() as orm: 

81 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id) 

82 if most_recent is None: 

83 return None, None 

84 if most_recent.source == ArchivedMessageSource.BACKFILL: 

85 # most recent = only backfill, fetch everything since last backfill 

86 return self.__to_bound(most_recent), None 

87 

88 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id( 

89 orm, self.room.id, ArchivedMessageSource.BACKFILL 

90 ) 

91 if most_recent_back_filled is None: 

92 # group was never back-filled, fetch everything before first live 

93 least_recent_live = self.__store.get_first(orm, self.room.id, True) 

94 assert least_recent_live is not None 

95 return None, self.__to_bound(least_recent_live) 

96 

97 assert most_recent_back_filled.legacy_id is not None 

98 least_recent_live = self.__store.get_least_recent_with_legacy_id_after( 

99 orm, self.room.id, most_recent_back_filled.legacy_id 

100 ) 

101 assert least_recent_live is not None 

102 # this is a hole caused by slidge downtime 

103 return self.__to_bound(most_recent_back_filled), self.__to_bound( 

104 least_recent_live 

105 ) 

106 

107 def get_all( 

108 self, 

109 start_date: Optional[datetime] = None, 

110 end_date: Optional[datetime] = None, 

111 before_id: Optional[str] = None, 

112 after_id: Optional[str] = None, 

113 ids: Collection[str] = (), 

114 last_page_n: Optional[int] = None, 

115 sender: Optional[str] = None, 

116 flip: bool = False, 

117 ): 

118 with self.__store.session() as orm: 

119 for msg in self.__store.get_messages( 

120 orm, 

121 self.room.id, 

122 before_id=before_id, 

123 after_id=after_id, 

124 ids=ids, 

125 last_page_n=last_page_n, 

126 sender=sender, 

127 start_date=start_date, 

128 end_date=end_date, 

129 flip=flip, 

130 ): 

131 yield msg 

132 

133 async def send_metadata(self, iq: Iq) -> None: 

134 """ 

135 Send archive extent, as per the spec 

136 

137 :param iq: 

138 :return: 

139 """ 

140 reply = iq.reply() 

141 with self.__store.session() as orm: 

142 messages = self.__store.get_first_and_last(orm, self.room.id) 

143 if messages: 

144 for x, m in [("start", messages[0]), ("end", messages[-1])]: 

145 reply["mam_metadata"][x]["id"] = m.id 

146 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace( 

147 tzinfo=timezone.utc 

148 ) 

149 else: 

150 reply.enable("mam_metadata") 

151 reply.send() 

152 

153 

154def archivable(msg: Message) -> bool: 

155 """ 

156 Determine if a message stanza is worth archiving, ie, convey meaningful 

157 info 

158 

159 :param msg: 

160 :return: 

161 """ 

162 

163 if msg.get_plugin("no-store", check=True): 

164 return False 

165 

166 if msg.get_plugin("no-permanent-store", check=True): 

167 return False 

168 

169 if msg.get_plugin("store", check=True): 

170 return True 

171 

172 if msg["body"]: 

173 return True 

174 

175 if msg.get_plugin("retract", check=True): 

176 return True 

177 

178 if msg.get_plugin("reactions", check=True): 

179 return True 

180 

181 if msg.get_plugin("displayed", check=True): 

182 return True 

183 

184 if msg["thread"] and msg["subject"]: 

185 return True 

186 

187 return False 

188 

189 

190log = logging.getLogger(__name__)