Coverage for slidge / group / archive.py: 98%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import logging 

2import uuid 

3import warnings 

4from copy import copy 

5from datetime import datetime, timezone 

6from typing import TYPE_CHECKING, Collection, Optional 

7 

8from slixmpp import Iq, Message 

9 

10from ..db.models import ArchivedMessage, ArchivedMessageSource, Room 

11from ..db.store import SlidgeStore 

12from ..util.archive_msg import HistoryMessage 

13from ..util.types import HoleBound 

14 

15if TYPE_CHECKING: 

16 from .participant import LegacyParticipant 

17 

18 

19class MessageArchive: 

20 def __init__(self, room: Room, store: SlidgeStore) -> None: 

21 self.room = room 

22 self.__rooms_store = store.rooms 

23 self.__store = store.mam 

24 

25 def add( 

26 self, 

27 msg: Message, 

28 participant: Optional["LegacyParticipant"] = None, 

29 archive_only: bool = False, 

30 legacy_msg_id=None, 

31 ) -> None: 

32 """ 

33 Add a message to the archive if it is deemed archivable 

34 

35 :param msg: 

36 :param participant: 

37 :param archive_only: 

38 :param legacy_msg_id: 

39 """ 

40 if not archivable(msg): 

41 return 

42 new_msg = copy(msg) 

43 if participant and not participant.muc.is_anonymous: 

44 new_msg["muc"]["role"] = participant.role or "participant" 

45 new_msg["muc"]["affiliation"] = participant.affiliation or "member" 

46 if participant.contact: 

47 new_msg["muc"]["jid"] = participant.contact.jid.bare 

48 elif participant.is_user: 

49 new_msg["muc"]["jid"] = participant.user_jid.bare 

50 elif participant.is_system: 

51 new_msg["muc"]["jid"] = participant.muc.jid 

52 else: 

53 warnings.warn( 

54 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'" 

55 ) 

56 new_msg["muc"]["jid"] = ( 

57 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}" 

58 ) 

59 

60 with self.__store.session(expire_on_commit=False) as orm: 

61 if self.room.id is None: 

62 self.room = self.__rooms_store.get( 

63 orm, self.room.user_account_id, self.room.legacy_id 

64 ) 

65 self.__store.add_message( 

66 orm, 

67 self.room.id, 

68 HistoryMessage(new_msg), 

69 archive_only, 

70 None if legacy_msg_id is None else str(legacy_msg_id), 

71 ) 

72 orm.commit() 

73 

74 def __iter__(self): 

75 return iter(self.get_all()) 

76 

77 @staticmethod 

78 def __to_bound(stored: ArchivedMessage): 

79 return HoleBound( 

80 stored.legacy_id, # type:ignore 

81 stored.timestamp.replace(tzinfo=timezone.utc), 

82 ) 

83 

84 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]: 

85 with self.__store.session() as orm: 

86 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id) 

87 if most_recent is None: 

88 return None, None 

89 if most_recent.source == ArchivedMessageSource.BACKFILL: 

90 # most recent = only backfill, fetch everything since last backfill 

91 return self.__to_bound(most_recent), None 

92 

93 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id( 

94 orm, self.room.id, ArchivedMessageSource.BACKFILL 

95 ) 

96 if most_recent_back_filled is None: 

97 # group was never back-filled, fetch everything before first live 

98 least_recent_live = self.__store.get_first(orm, self.room.id, True) 

99 assert least_recent_live is not None 

100 return None, self.__to_bound(least_recent_live) 

101 

102 assert most_recent_back_filled.legacy_id is not None 

103 least_recent_live = self.__store.get_least_recent_with_legacy_id_after( 

104 orm, self.room.id, most_recent_back_filled.legacy_id 

105 ) 

106 assert least_recent_live is not None 

107 # this is a hole caused by slidge downtime 

108 return self.__to_bound(most_recent_back_filled), self.__to_bound( 

109 least_recent_live 

110 ) 

111 

112 def get_all( 

113 self, 

114 start_date: Optional[datetime] = None, 

115 end_date: Optional[datetime] = None, 

116 before_id: Optional[str] = None, 

117 after_id: Optional[str] = None, 

118 ids: Collection[str] = (), 

119 last_page_n: Optional[int] = None, 

120 sender: Optional[str] = None, 

121 flip: bool = False, 

122 ): 

123 with self.__store.session() as orm: 

124 for msg in self.__store.get_messages( 

125 orm, 

126 self.room.id, 

127 before_id=before_id, 

128 after_id=after_id, 

129 ids=ids, 

130 last_page_n=last_page_n, 

131 sender=sender, 

132 start_date=start_date, 

133 end_date=end_date, 

134 flip=flip, 

135 ): 

136 yield msg 

137 

138 async def send_metadata(self, iq: Iq) -> None: 

139 """ 

140 Send archive extent, as per the spec 

141 

142 :param iq: 

143 :return: 

144 """ 

145 reply = iq.reply() 

146 with self.__store.session() as orm: 

147 messages = self.__store.get_first_and_last(orm, self.room.id) 

148 if messages: 

149 for x, m in [("start", messages[0]), ("end", messages[-1])]: 

150 reply["mam_metadata"][x]["id"] = m.id 

151 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace( 

152 tzinfo=timezone.utc 

153 ) 

154 else: 

155 reply.enable("mam_metadata") 

156 reply.send() 

157 

158 

159def archivable(msg: Message) -> bool: 

160 """ 

161 Determine if a message stanza is worth archiving, ie, convey meaningful 

162 info 

163 

164 :param msg: 

165 :return: 

166 """ 

167 

168 if msg.get_plugin("no-store", check=True): 

169 return False 

170 

171 if msg.get_plugin("no-permanent-store", check=True): 

172 return False 

173 

174 if msg.get_plugin("store", check=True): 

175 return True 

176 

177 if msg["body"]: 

178 return True 

179 

180 if msg.get_plugin("retract", check=True): 

181 return True 

182 

183 if msg.get_plugin("reactions", check=True): 

184 return True 

185 

186 if msg.get_plugin("displayed", check=True): 

187 return True 

188 

189 if msg["thread"] and msg["subject"]: 

190 return True 

191 

192 return False 

193 

194 

195log = logging.getLogger(__name__)