Coverage for slidge / group / archive.py: 97%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import logging
2import uuid
3import warnings
4from collections.abc import Collection, Iterator
5from copy import copy
6from datetime import UTC, datetime
8from slixmpp import Iq, Message
10from ..db.models import ArchivedMessage, ArchivedMessageSource, Room
11from ..db.store import SlidgeStore
12from ..util.archive_msg import HistoryMessage
13from ..util.types import AnyParticipant, HoleBound
16class MessageArchive:
17 def __init__(self, room: Room, store: SlidgeStore) -> None:
18 self.room = room
19 self.__rooms_store = store.rooms
20 self.__store = store.mam
22 def add(
23 self,
24 msg: Message,
25 participant: "AnyParticipant | None" = None,
26 archive_only: bool = False,
27 legacy_msg_id: str | None = None,
28 ) -> None:
29 """
30 Add a message to the archive if it is deemed archivable
32 :param msg:
33 :param participant:
34 :param archive_only:
35 :param legacy_msg_id:
36 """
37 if not archivable(msg):
38 return
39 new_msg = copy(msg)
40 if participant and not participant.muc.is_anonymous:
41 new_msg["muc"]["role"] = participant.role or "participant"
42 new_msg["muc"]["affiliation"] = participant.affiliation or "member"
43 if participant.contact:
44 new_msg["muc"]["jid"] = participant.contact.jid.bare
45 elif participant.is_user:
46 new_msg["muc"]["jid"] = participant.user_jid.bare
47 elif participant.is_system:
48 new_msg["muc"]["jid"] = participant.muc.jid
49 else:
50 warnings.warn(
51 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'"
52 )
53 new_msg["muc"]["jid"] = (
54 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}"
55 )
57 if "replace" in msg:
58 legacy_msg_id = None
60 with self.__store.session(expire_on_commit=False) as orm:
61 if self.room.id is None:
62 self.room = self.__rooms_store.get(
63 orm, self.room.user_account_id, self.room.legacy_id
64 )
65 self.__store.add_message(
66 orm,
67 self.room.id,
68 HistoryMessage(new_msg),
69 archive_only,
70 None if legacy_msg_id is None else str(legacy_msg_id),
71 )
72 orm.commit()
74 def __iter__(self) -> Iterator[HistoryMessage]:
75 return iter(self.get_all())
77 @staticmethod
78 def __to_bound(stored: ArchivedMessage) -> HoleBound:
79 return HoleBound(
80 stored.legacy_id, # type:ignore
81 stored.timestamp.replace(tzinfo=UTC),
82 )
84 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]:
85 with self.__store.session() as orm:
86 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id)
87 if most_recent is None:
88 return None, None
89 if most_recent.source == ArchivedMessageSource.BACKFILL:
90 # most recent = only backfill, fetch everything since last backfill
91 return self.__to_bound(most_recent), None
93 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id(
94 orm, self.room.id, ArchivedMessageSource.BACKFILL
95 )
96 if most_recent_back_filled is None:
97 # group was never back-filled, fetch everything before first live
98 least_recent_live = self.__store.get_first(orm, self.room.id, True)
99 assert least_recent_live is not None
100 return None, self.__to_bound(least_recent_live)
102 assert most_recent_back_filled.legacy_id is not None
103 least_recent_live = self.__store.get_least_recent_with_legacy_id_after(
104 orm, self.room.id, most_recent_back_filled.legacy_id
105 )
106 assert least_recent_live is not None
107 # this is a hole caused by slidge downtime
108 return self.__to_bound(most_recent_back_filled), self.__to_bound(
109 least_recent_live
110 )
112 def get_all(
113 self,
114 start_date: datetime | None = None,
115 end_date: datetime | None = None,
116 before_id: str | None = None,
117 after_id: str | None = None,
118 ids: Collection[str] = (),
119 last_page_n: int | None = None,
120 sender: str | None = None,
121 flip: bool = False,
122 ) -> Iterator[HistoryMessage]:
123 with self.__store.session() as orm:
124 yield from self.__store.get_messages(
125 orm,
126 self.room.id,
127 before_id=before_id,
128 after_id=after_id,
129 ids=ids,
130 last_page_n=last_page_n,
131 sender=sender,
132 start_date=start_date,
133 end_date=end_date,
134 flip=flip,
135 )
137 async def send_metadata(self, iq: Iq) -> None:
138 """
139 Send archive extent, as per the spec
141 :param iq:
142 :return:
143 """
144 reply = iq.reply()
145 with self.__store.session() as orm:
146 messages = self.__store.get_first_and_last(orm, self.room.id)
147 if messages:
148 for x, m in [("start", messages[0]), ("end", messages[-1])]:
149 reply["mam_metadata"][x]["id"] = m.id
150 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(tzinfo=UTC)
151 else:
152 reply.enable("mam_metadata")
153 reply.send()
156def archivable(msg: Message) -> bool:
157 """
158 Determine if a message stanza is worth archiving, ie, convey meaningful
159 info
161 :param msg:
162 :return:
163 """
165 if "no-store" in msg:
166 return False
168 if "no-permanent-store" in msg:
169 return False
171 if "store" in msg:
172 return True
174 if msg["body"]:
175 return True
177 if "retract" in msg:
178 return True
180 if "reactions" in msg:
181 return True
183 if "displayed" in msg:
184 return True
186 return bool(msg["thread"] and msg["subject"])
189log = logging.getLogger(__name__)