Coverage for slidge / group / archive.py: 98%
90 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import logging
2import uuid
3import warnings
4from copy import copy
5from datetime import datetime, timezone
6from typing import TYPE_CHECKING, Collection, Optional
8from slixmpp import Iq, Message
10from ..db.models import ArchivedMessage, ArchivedMessageSource, Room
11from ..db.store import SlidgeStore
12from ..util.archive_msg import HistoryMessage
13from ..util.types import HoleBound
15if TYPE_CHECKING:
16 from .participant import LegacyParticipant
19class MessageArchive:
20 def __init__(self, room: Room, store: SlidgeStore) -> None:
21 self.room = room
22 self.__rooms_store = store.rooms
23 self.__store = store.mam
25 def add(
26 self,
27 msg: Message,
28 participant: Optional["LegacyParticipant"] = None,
29 archive_only: bool = False,
30 legacy_msg_id=None,
31 ) -> None:
32 """
33 Add a message to the archive if it is deemed archivable
35 :param msg:
36 :param participant:
37 :param archive_only:
38 :param legacy_msg_id:
39 """
40 if not archivable(msg):
41 return
42 new_msg = copy(msg)
43 if participant and not participant.muc.is_anonymous:
44 new_msg["muc"]["role"] = participant.role or "participant"
45 new_msg["muc"]["affiliation"] = participant.affiliation or "member"
46 if participant.contact:
47 new_msg["muc"]["jid"] = participant.contact.jid.bare
48 elif participant.is_user:
49 new_msg["muc"]["jid"] = participant.user_jid.bare
50 elif participant.is_system:
51 new_msg["muc"]["jid"] = participant.muc.jid
52 else:
53 warnings.warn(
54 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'"
55 )
56 new_msg["muc"]["jid"] = (
57 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}"
58 )
60 with self.__store.session(expire_on_commit=False) as orm:
61 if self.room.id is None:
62 self.room = self.__rooms_store.get(
63 orm, self.room.user_account_id, self.room.legacy_id
64 )
65 self.__store.add_message(
66 orm,
67 self.room.id,
68 HistoryMessage(new_msg),
69 archive_only,
70 None if legacy_msg_id is None else str(legacy_msg_id),
71 )
72 orm.commit()
74 def __iter__(self):
75 return iter(self.get_all())
77 @staticmethod
78 def __to_bound(stored: ArchivedMessage):
79 return HoleBound(
80 stored.legacy_id, # type:ignore
81 stored.timestamp.replace(tzinfo=timezone.utc),
82 )
84 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]:
85 with self.__store.session() as orm:
86 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id)
87 if most_recent is None:
88 return None, None
89 if most_recent.source == ArchivedMessageSource.BACKFILL:
90 # most recent = only backfill, fetch everything since last backfill
91 return self.__to_bound(most_recent), None
93 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id(
94 orm, self.room.id, ArchivedMessageSource.BACKFILL
95 )
96 if most_recent_back_filled is None:
97 # group was never back-filled, fetch everything before first live
98 least_recent_live = self.__store.get_first(orm, self.room.id, True)
99 assert least_recent_live is not None
100 return None, self.__to_bound(least_recent_live)
102 assert most_recent_back_filled.legacy_id is not None
103 least_recent_live = self.__store.get_least_recent_with_legacy_id_after(
104 orm, self.room.id, most_recent_back_filled.legacy_id
105 )
106 assert least_recent_live is not None
107 # this is a hole caused by slidge downtime
108 return self.__to_bound(most_recent_back_filled), self.__to_bound(
109 least_recent_live
110 )
112 def get_all(
113 self,
114 start_date: Optional[datetime] = None,
115 end_date: Optional[datetime] = None,
116 before_id: Optional[str] = None,
117 after_id: Optional[str] = None,
118 ids: Collection[str] = (),
119 last_page_n: Optional[int] = None,
120 sender: Optional[str] = None,
121 flip: bool = False,
122 ):
123 with self.__store.session() as orm:
124 for msg in self.__store.get_messages(
125 orm,
126 self.room.id,
127 before_id=before_id,
128 after_id=after_id,
129 ids=ids,
130 last_page_n=last_page_n,
131 sender=sender,
132 start_date=start_date,
133 end_date=end_date,
134 flip=flip,
135 ):
136 yield msg
138 async def send_metadata(self, iq: Iq) -> None:
139 """
140 Send archive extent, as per the spec
142 :param iq:
143 :return:
144 """
145 reply = iq.reply()
146 with self.__store.session() as orm:
147 messages = self.__store.get_first_and_last(orm, self.room.id)
148 if messages:
149 for x, m in [("start", messages[0]), ("end", messages[-1])]:
150 reply["mam_metadata"][x]["id"] = m.id
151 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(
152 tzinfo=timezone.utc
153 )
154 else:
155 reply.enable("mam_metadata")
156 reply.send()
159def archivable(msg: Message) -> bool:
160 """
161 Determine if a message stanza is worth archiving, ie, convey meaningful
162 info
164 :param msg:
165 :return:
166 """
168 if msg.get_plugin("no-store", check=True):
169 return False
171 if msg.get_plugin("no-permanent-store", check=True):
172 return False
174 if msg.get_plugin("store", check=True):
175 return True
177 if msg["body"]:
178 return True
180 if msg.get_plugin("retract", check=True):
181 return True
183 if msg.get_plugin("reactions", check=True):
184 return True
186 if msg.get_plugin("displayed", check=True):
187 return True
189 if msg["thread"] and msg["subject"]:
190 return True
192 return False
195log = logging.getLogger(__name__)