Coverage for slidge/group/archive.py: 98%
87 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import logging
2import uuid
3import warnings
4from copy import copy
5from datetime import datetime, timezone
6from typing import TYPE_CHECKING, Collection, Optional
8from slixmpp import Iq, Message
10from ..db.models import ArchivedMessage, ArchivedMessageSource, Room
11from ..db.store import MAMStore
12from ..util.archive_msg import HistoryMessage
13from ..util.types import HoleBound
15if TYPE_CHECKING:
16 from .participant import LegacyParticipant
19class MessageArchive:
20 def __init__(self, room: Room, store: MAMStore) -> None:
21 self.room = room
22 self.__store = store
24 def add(
25 self,
26 msg: Message,
27 participant: Optional["LegacyParticipant"] = None,
28 archive_only: bool = False,
29 legacy_msg_id=None,
30 ) -> None:
31 """
32 Add a message to the archive if it is deemed archivable
34 :param msg:
35 :param participant:
36 :param archive_only:
37 :param legacy_msg_id:
38 """
39 if not archivable(msg):
40 return
41 new_msg = copy(msg)
42 if participant and not participant.muc.is_anonymous:
43 new_msg["muc"]["role"] = participant.role or "participant"
44 new_msg["muc"]["affiliation"] = participant.affiliation or "member"
45 if participant.contact:
46 new_msg["muc"]["jid"] = participant.contact.jid.bare
47 elif participant.is_user:
48 new_msg["muc"]["jid"] = participant.user_jid.bare
49 elif participant.is_system:
50 new_msg["muc"]["jid"] = participant.muc.jid
51 else:
52 warnings.warn(
53 f"No real JID for participant '{participant.nickname}' in '{self.room.name}'"
54 )
55 new_msg["muc"]["jid"] = (
56 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}"
57 )
59 with self.__store.session() as orm:
60 self.__store.add_message(
61 orm,
62 self.room.id,
63 HistoryMessage(new_msg),
64 archive_only,
65 None if legacy_msg_id is None else str(legacy_msg_id),
66 )
67 orm.commit()
69 def __iter__(self):
70 return iter(self.get_all())
72 @staticmethod
73 def __to_bound(stored: ArchivedMessage):
74 return HoleBound(
75 stored.legacy_id, # type:ignore
76 stored.timestamp.replace(tzinfo=timezone.utc),
77 )
79 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]:
80 with self.__store.session() as orm:
81 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id)
82 if most_recent is None:
83 return None, None
84 if most_recent.source == ArchivedMessageSource.BACKFILL:
85 # most recent = only backfill, fetch everything since last backfill
86 return self.__to_bound(most_recent), None
88 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id(
89 orm, self.room.id, ArchivedMessageSource.BACKFILL
90 )
91 if most_recent_back_filled is None:
92 # group was never back-filled, fetch everything before first live
93 least_recent_live = self.__store.get_first(orm, self.room.id, True)
94 assert least_recent_live is not None
95 return None, self.__to_bound(least_recent_live)
97 assert most_recent_back_filled.legacy_id is not None
98 least_recent_live = self.__store.get_least_recent_with_legacy_id_after(
99 orm, self.room.id, most_recent_back_filled.legacy_id
100 )
101 assert least_recent_live is not None
102 # this is a hole caused by slidge downtime
103 return self.__to_bound(most_recent_back_filled), self.__to_bound(
104 least_recent_live
105 )
107 def get_all(
108 self,
109 start_date: Optional[datetime] = None,
110 end_date: Optional[datetime] = None,
111 before_id: Optional[str] = None,
112 after_id: Optional[str] = None,
113 ids: Collection[str] = (),
114 last_page_n: Optional[int] = None,
115 sender: Optional[str] = None,
116 flip: bool = False,
117 ):
118 with self.__store.session() as orm:
119 for msg in self.__store.get_messages(
120 orm,
121 self.room.id,
122 before_id=before_id,
123 after_id=after_id,
124 ids=ids,
125 last_page_n=last_page_n,
126 sender=sender,
127 start_date=start_date,
128 end_date=end_date,
129 flip=flip,
130 ):
131 yield msg
133 async def send_metadata(self, iq: Iq) -> None:
134 """
135 Send archive extent, as per the spec
137 :param iq:
138 :return:
139 """
140 reply = iq.reply()
141 with self.__store.session() as orm:
142 messages = self.__store.get_first_and_last(orm, self.room.id)
143 if messages:
144 for x, m in [("start", messages[0]), ("end", messages[-1])]:
145 reply["mam_metadata"][x]["id"] = m.id
146 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(
147 tzinfo=timezone.utc
148 )
149 else:
150 reply.enable("mam_metadata")
151 reply.send()
154def archivable(msg: Message) -> bool:
155 """
156 Determine if a message stanza is worth archiving, ie, convey meaningful
157 info
159 :param msg:
160 :return:
161 """
163 if msg.get_plugin("no-store", check=True):
164 return False
166 if msg.get_plugin("no-permanent-store", check=True):
167 return False
169 if msg.get_plugin("store", check=True):
170 return True
172 if msg["body"]:
173 return True
175 if msg.get_plugin("retract", check=True):
176 return True
178 if msg.get_plugin("reactions", check=True):
179 return True
181 if msg.get_plugin("displayed", check=True):
182 return True
184 if msg["thread"] and msg["subject"]:
185 return True
187 return False
190log = logging.getLogger(__name__)