Coverage for slidge / group / archive.py: 98%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-13 22:59 +0000

1import logging 

2import uuid 

3import warnings 

4from collections.abc import Collection, Iterator 

5from copy import copy 

6from datetime import UTC, datetime 

7from typing import TYPE_CHECKING, Optional 

8 

9from slixmpp import Iq, Message 

10 

11from ..db.models import ArchivedMessage, ArchivedMessageSource, Room 

12from ..db.store import SlidgeStore 

13from ..util.archive_msg import HistoryMessage 

14from ..util.types import HoleBound 

15 

16if TYPE_CHECKING: 

17 from .participant import LegacyParticipant 

18 

19 

20class MessageArchive: 

21 def __init__(self, room: Room, store: SlidgeStore) -> None: 

22 self.room = room 

23 self.__rooms_store = store.rooms 

24 self.__store = store.mam 

25 

26 def add( 

27 self, 

28 msg: Message, 

29 participant: Optional["LegacyParticipant"] = None, 

30 archive_only: bool = False, 

31 legacy_msg_id=None, 

32 ) -> None: 

33 """ 

34 Add a message to the archive if it is deemed archivable 

35 

36 :param msg: 

37 :param participant: 

38 :param archive_only: 

39 :param legacy_msg_id: 

40 """ 

41 if not archivable(msg): 

42 return 

43 new_msg = copy(msg) 

44 if participant and not participant.muc.is_anonymous: 

45 new_msg["muc"]["role"] = participant.role or "participant" 

46 new_msg["muc"]["affiliation"] = participant.affiliation or "member" 

47 if participant.contact: 

48 new_msg["muc"]["jid"] = participant.contact.jid.bare 

49 elif participant.is_user: 

50 new_msg["muc"]["jid"] = participant.user_jid.bare 

51 elif participant.is_system: 

52 new_msg["muc"]["jid"] = participant.muc.jid 

53 else: 

54 warnings.warn( 

55 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'" 

56 ) 

57 new_msg["muc"]["jid"] = ( 

58 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}" 

59 ) 

60 

61 if "replace" in msg: 

62 legacy_msg_id = None 

63 

64 with self.__store.session(expire_on_commit=False) as orm: 

65 if self.room.id is None: 

66 self.room = self.__rooms_store.get( 

67 orm, self.room.user_account_id, self.room.legacy_id 

68 ) 

69 self.__store.add_message( 

70 orm, 

71 self.room.id, 

72 HistoryMessage(new_msg), 

73 archive_only, 

74 None if legacy_msg_id is None else str(legacy_msg_id), 

75 ) 

76 orm.commit() 

77 

78 def __iter__(self) -> Iterator[HistoryMessage]: 

79 return iter(self.get_all()) 

80 

81 @staticmethod 

82 def __to_bound(stored: ArchivedMessage) -> HoleBound: 

83 return HoleBound( 

84 stored.legacy_id, # type:ignore 

85 stored.timestamp.replace(tzinfo=UTC), 

86 ) 

87 

88 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]: 

89 with self.__store.session() as orm: 

90 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id) 

91 if most_recent is None: 

92 return None, None 

93 if most_recent.source == ArchivedMessageSource.BACKFILL: 

94 # most recent = only backfill, fetch everything since last backfill 

95 return self.__to_bound(most_recent), None 

96 

97 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id( 

98 orm, self.room.id, ArchivedMessageSource.BACKFILL 

99 ) 

100 if most_recent_back_filled is None: 

101 # group was never back-filled, fetch everything before first live 

102 least_recent_live = self.__store.get_first(orm, self.room.id, True) 

103 assert least_recent_live is not None 

104 return None, self.__to_bound(least_recent_live) 

105 

106 assert most_recent_back_filled.legacy_id is not None 

107 least_recent_live = self.__store.get_least_recent_with_legacy_id_after( 

108 orm, self.room.id, most_recent_back_filled.legacy_id 

109 ) 

110 assert least_recent_live is not None 

111 # this is a hole caused by slidge downtime 

112 return self.__to_bound(most_recent_back_filled), self.__to_bound( 

113 least_recent_live 

114 ) 

115 

116 def get_all( 

117 self, 

118 start_date: datetime | None = None, 

119 end_date: datetime | None = None, 

120 before_id: str | None = None, 

121 after_id: str | None = None, 

122 ids: Collection[str] = (), 

123 last_page_n: int | None = None, 

124 sender: str | None = None, 

125 flip: bool = False, 

126 ) -> Iterator[HistoryMessage]: 

127 with self.__store.session() as orm: 

128 yield from self.__store.get_messages( 

129 orm, 

130 self.room.id, 

131 before_id=before_id, 

132 after_id=after_id, 

133 ids=ids, 

134 last_page_n=last_page_n, 

135 sender=sender, 

136 start_date=start_date, 

137 end_date=end_date, 

138 flip=flip, 

139 ) 

140 

141 async def send_metadata(self, iq: Iq) -> None: 

142 """ 

143 Send archive extent, as per the spec 

144 

145 :param iq: 

146 :return: 

147 """ 

148 reply = iq.reply() 

149 with self.__store.session() as orm: 

150 messages = self.__store.get_first_and_last(orm, self.room.id) 

151 if messages: 

152 for x, m in [("start", messages[0]), ("end", messages[-1])]: 

153 reply["mam_metadata"][x]["id"] = m.id 

154 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(tzinfo=UTC) 

155 else: 

156 reply.enable("mam_metadata") 

157 reply.send() 

158 

159 

160def archivable(msg: Message) -> bool: 

161 """ 

162 Determine if a message stanza is worth archiving, ie, convey meaningful 

163 info 

164 

165 :param msg: 

166 :return: 

167 """ 

168 

169 if msg.get_plugin("no-store", check=True): 

170 return False 

171 

172 if msg.get_plugin("no-permanent-store", check=True): 

173 return False 

174 

175 if msg.get_plugin("store", check=True): 

176 return True 

177 

178 if msg["body"]: 

179 return True 

180 

181 if msg.get_plugin("retract", check=True): 

182 return True 

183 

184 if msg.get_plugin("reactions", check=True): 

185 return True 

186 

187 if msg.get_plugin("displayed", check=True): 

188 return True 

189 

190 if msg["thread"] and msg["subject"]: 

191 return True 

192 

193 return False 

194 

195 

196log = logging.getLogger(__name__)