Coverage for slidge/group/archive.py: 98%

84 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import logging 

2import uuid 

3from copy import copy 

4from datetime import datetime, timezone 

5from typing import TYPE_CHECKING, Collection, Optional 

6 

7from slixmpp import Iq, Message 

8 

9from ..db.models import ArchivedMessage, ArchivedMessageSource, Room 

10from ..db.store import MAMStore 

11from ..util.archive_msg import HistoryMessage 

12from ..util.types import HoleBound 

13 

14if TYPE_CHECKING: 

15 from .participant import LegacyParticipant 

16 

17 

18class MessageArchive: 

19 def __init__(self, room: Room, store: MAMStore) -> None: 

20 self.room = room 

21 self.__store = store 

22 

23 def add( 

24 self, 

25 msg: Message, 

26 participant: Optional["LegacyParticipant"] = None, 

27 archive_only: bool = False, 

28 legacy_msg_id=None, 

29 ) -> None: 

30 """ 

31 Add a message to the archive if it is deemed archivable 

32 

33 :param msg: 

34 :param participant: 

35 :param archive_only: 

36 :param legacy_msg_id: 

37 """ 

38 if not archivable(msg): 

39 return 

40 new_msg = copy(msg) 

41 if participant and not participant.muc.is_anonymous: 

42 new_msg["muc"]["role"] = participant.role or "participant" 

43 new_msg["muc"]["affiliation"] = participant.affiliation or "member" 

44 if participant.contact: 

45 new_msg["muc"]["jid"] = participant.contact.jid.bare 

46 elif participant.is_user: 

47 new_msg["muc"]["jid"] = participant.user_jid.bare 

48 elif participant.is_system: 

49 new_msg["muc"]["jid"] = participant.muc.jid 

50 else: 

51 log.warning("No real JID for participant in this group") 

52 new_msg["muc"]["jid"] = ( 

53 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}" 

54 ) 

55 

56 with self.__store.session() as orm: 

57 self.__store.add_message( 

58 orm, 

59 self.room.id, 

60 HistoryMessage(new_msg), 

61 archive_only, 

62 None if legacy_msg_id is None else str(legacy_msg_id), 

63 ) 

64 orm.commit() 

65 

66 def __iter__(self): 

67 return iter(self.get_all()) 

68 

69 @staticmethod 

70 def __to_bound(stored: ArchivedMessage): 

71 return HoleBound( 

72 stored.legacy_id, # type:ignore 

73 stored.timestamp.replace(tzinfo=timezone.utc), 

74 ) 

75 

76 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]: 

77 with self.__store.session() as orm: 

78 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id) 

79 if most_recent is None: 

80 return None, None 

81 if most_recent.source == ArchivedMessageSource.BACKFILL: 

82 # most recent = only backfill, fetch everything since last backfill 

83 return self.__to_bound(most_recent), None 

84 

85 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id( 

86 orm, self.room.id, ArchivedMessageSource.BACKFILL 

87 ) 

88 if most_recent_back_filled is None: 

89 # group was never back-filled, fetch everything before first live 

90 least_recent_live = self.__store.get_first(orm, self.room.id, True) 

91 assert least_recent_live is not None 

92 return None, self.__to_bound(least_recent_live) 

93 

94 assert most_recent_back_filled.legacy_id is not None 

95 least_recent_live = self.__store.get_least_recent_with_legacy_id_after( 

96 orm, self.room.id, most_recent_back_filled.legacy_id 

97 ) 

98 assert least_recent_live is not None 

99 # this is a hole caused by slidge downtime 

100 return self.__to_bound(most_recent_back_filled), self.__to_bound( 

101 least_recent_live 

102 ) 

103 

104 def get_all( 

105 self, 

106 start_date: Optional[datetime] = None, 

107 end_date: Optional[datetime] = None, 

108 before_id: Optional[str] = None, 

109 after_id: Optional[str] = None, 

110 ids: Collection[str] = (), 

111 last_page_n: Optional[int] = None, 

112 sender: Optional[str] = None, 

113 flip: bool = False, 

114 ): 

115 with self.__store.session() as orm: 

116 for msg in self.__store.get_messages( 

117 orm, 

118 self.room.id, 

119 before_id=before_id, 

120 after_id=after_id, 

121 ids=ids, 

122 last_page_n=last_page_n, 

123 sender=sender, 

124 start_date=start_date, 

125 end_date=end_date, 

126 flip=flip, 

127 ): 

128 yield msg 

129 

130 async def send_metadata(self, iq: Iq) -> None: 

131 """ 

132 Send archive extent, as per the spec 

133 

134 :param iq: 

135 :return: 

136 """ 

137 reply = iq.reply() 

138 with self.__store.session() as orm: 

139 messages = self.__store.get_first_and_last(orm, self.room.id) 

140 if messages: 

141 for x, m in [("start", messages[0]), ("end", messages[-1])]: 

142 reply["mam_metadata"][x]["id"] = m.id 

143 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace( 

144 tzinfo=timezone.utc 

145 ) 

146 else: 

147 reply.enable("mam_metadata") 

148 reply.send() 

149 

150 

151def archivable(msg: Message) -> bool: 

152 """ 

153 Determine if a message stanza is worth archiving, ie, convey meaningful 

154 info 

155 

156 :param msg: 

157 :return: 

158 """ 

159 

160 if msg.get_plugin("no-store", check=True): 

161 return False 

162 

163 if msg.get_plugin("no-permanent-store", check=True): 

164 return False 

165 

166 if msg.get_plugin("store", check=True): 

167 return True 

168 

169 if msg["body"]: 

170 return True 

171 

172 if msg.get_plugin("retract", check=True): 

173 return True 

174 

175 if msg.get_plugin("reactions", check=True): 

176 return True 

177 

178 if msg.get_plugin("displayed", check=True): 

179 return True 

180 

181 return False 

182 

183 

184log = logging.getLogger(__name__)