Coverage for slidge / group / archive.py: 98%
92 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
1import logging
2import uuid
3import warnings
4from collections.abc import Collection, Iterator
5from copy import copy
6from datetime import UTC, datetime
7from typing import TYPE_CHECKING, Optional
9from slixmpp import Iq, Message
11from ..db.models import ArchivedMessage, ArchivedMessageSource, Room
12from ..db.store import SlidgeStore
13from ..util.archive_msg import HistoryMessage
14from ..util.types import HoleBound
16if TYPE_CHECKING:
17 from .participant import LegacyParticipant
20class MessageArchive:
21 def __init__(self, room: Room, store: SlidgeStore) -> None:
22 self.room = room
23 self.__rooms_store = store.rooms
24 self.__store = store.mam
26 def add(
27 self,
28 msg: Message,
29 participant: Optional["LegacyParticipant"] = None,
30 archive_only: bool = False,
31 legacy_msg_id=None,
32 ) -> None:
33 """
34 Add a message to the archive if it is deemed archivable
36 :param msg:
37 :param participant:
38 :param archive_only:
39 :param legacy_msg_id:
40 """
41 if not archivable(msg):
42 return
43 new_msg = copy(msg)
44 if participant and not participant.muc.is_anonymous:
45 new_msg["muc"]["role"] = participant.role or "participant"
46 new_msg["muc"]["affiliation"] = participant.affiliation or "member"
47 if participant.contact:
48 new_msg["muc"]["jid"] = participant.contact.jid.bare
49 elif participant.is_user:
50 new_msg["muc"]["jid"] = participant.user_jid.bare
51 elif participant.is_system:
52 new_msg["muc"]["jid"] = participant.muc.jid
53 else:
54 warnings.warn(
55 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'"
56 )
57 new_msg["muc"]["jid"] = (
58 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}"
59 )
61 if "replace" in msg:
62 legacy_msg_id = None
64 with self.__store.session(expire_on_commit=False) as orm:
65 if self.room.id is None:
66 self.room = self.__rooms_store.get(
67 orm, self.room.user_account_id, self.room.legacy_id
68 )
69 self.__store.add_message(
70 orm,
71 self.room.id,
72 HistoryMessage(new_msg),
73 archive_only,
74 None if legacy_msg_id is None else str(legacy_msg_id),
75 )
76 orm.commit()
78 def __iter__(self) -> Iterator[HistoryMessage]:
79 return iter(self.get_all())
81 @staticmethod
82 def __to_bound(stored: ArchivedMessage) -> HoleBound:
83 return HoleBound(
84 stored.legacy_id, # type:ignore
85 stored.timestamp.replace(tzinfo=UTC),
86 )
88 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]:
89 with self.__store.session() as orm:
90 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id)
91 if most_recent is None:
92 return None, None
93 if most_recent.source == ArchivedMessageSource.BACKFILL:
94 # most recent = only backfill, fetch everything since last backfill
95 return self.__to_bound(most_recent), None
97 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id(
98 orm, self.room.id, ArchivedMessageSource.BACKFILL
99 )
100 if most_recent_back_filled is None:
101 # group was never back-filled, fetch everything before first live
102 least_recent_live = self.__store.get_first(orm, self.room.id, True)
103 assert least_recent_live is not None
104 return None, self.__to_bound(least_recent_live)
106 assert most_recent_back_filled.legacy_id is not None
107 least_recent_live = self.__store.get_least_recent_with_legacy_id_after(
108 orm, self.room.id, most_recent_back_filled.legacy_id
109 )
110 assert least_recent_live is not None
111 # this is a hole caused by slidge downtime
112 return self.__to_bound(most_recent_back_filled), self.__to_bound(
113 least_recent_live
114 )
116 def get_all(
117 self,
118 start_date: datetime | None = None,
119 end_date: datetime | None = None,
120 before_id: str | None = None,
121 after_id: str | None = None,
122 ids: Collection[str] = (),
123 last_page_n: int | None = None,
124 sender: str | None = None,
125 flip: bool = False,
126 ) -> Iterator[HistoryMessage]:
127 with self.__store.session() as orm:
128 yield from self.__store.get_messages(
129 orm,
130 self.room.id,
131 before_id=before_id,
132 after_id=after_id,
133 ids=ids,
134 last_page_n=last_page_n,
135 sender=sender,
136 start_date=start_date,
137 end_date=end_date,
138 flip=flip,
139 )
141 async def send_metadata(self, iq: Iq) -> None:
142 """
143 Send archive extent, as per the spec
145 :param iq:
146 :return:
147 """
148 reply = iq.reply()
149 with self.__store.session() as orm:
150 messages = self.__store.get_first_and_last(orm, self.room.id)
151 if messages:
152 for x, m in [("start", messages[0]), ("end", messages[-1])]:
153 reply["mam_metadata"][x]["id"] = m.id
154 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(tzinfo=UTC)
155 else:
156 reply.enable("mam_metadata")
157 reply.send()
160def archivable(msg: Message) -> bool:
161 """
162 Determine if a message stanza is worth archiving, ie, convey meaningful
163 info
165 :param msg:
166 :return:
167 """
169 if msg.get_plugin("no-store", check=True):
170 return False
172 if msg.get_plugin("no-permanent-store", check=True):
173 return False
175 if msg.get_plugin("store", check=True):
176 return True
178 if msg["body"]:
179 return True
181 if msg.get_plugin("retract", check=True):
182 return True
184 if msg.get_plugin("reactions", check=True):
185 return True
187 if msg.get_plugin("displayed", check=True):
188 return True
190 if msg["thread"] and msg["subject"]:
191 return True
193 return False
196log = logging.getLogger(__name__)