Coverage for slidge/group/archive.py: 98%
84 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import logging
2import uuid
3from copy import copy
4from datetime import datetime, timezone
5from typing import TYPE_CHECKING, Collection, Optional
7from slixmpp import Iq, Message
9from ..db.models import ArchivedMessage, ArchivedMessageSource, Room
10from ..db.store import MAMStore
11from ..util.archive_msg import HistoryMessage
12from ..util.types import HoleBound
14if TYPE_CHECKING:
15 from .participant import LegacyParticipant
18class MessageArchive:
19 def __init__(self, room: Room, store: MAMStore) -> None:
20 self.room = room
21 self.__store = store
23 def add(
24 self,
25 msg: Message,
26 participant: Optional["LegacyParticipant"] = None,
27 archive_only: bool = False,
28 legacy_msg_id=None,
29 ) -> None:
30 """
31 Add a message to the archive if it is deemed archivable
33 :param msg:
34 :param participant:
35 :param archive_only:
36 :param legacy_msg_id:
37 """
38 if not archivable(msg):
39 return
40 new_msg = copy(msg)
41 if participant and not participant.muc.is_anonymous:
42 new_msg["muc"]["role"] = participant.role or "participant"
43 new_msg["muc"]["affiliation"] = participant.affiliation or "member"
44 if participant.contact:
45 new_msg["muc"]["jid"] = participant.contact.jid.bare
46 elif participant.is_user:
47 new_msg["muc"]["jid"] = participant.user_jid.bare
48 elif participant.is_system:
49 new_msg["muc"]["jid"] = participant.muc.jid
50 else:
51 log.warning("No real JID for participant in this group")
52 new_msg["muc"]["jid"] = (
53 f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}"
54 )
56 with self.__store.session() as orm:
57 self.__store.add_message(
58 orm,
59 self.room.id,
60 HistoryMessage(new_msg),
61 archive_only,
62 None if legacy_msg_id is None else str(legacy_msg_id),
63 )
64 orm.commit()
66 def __iter__(self):
67 return iter(self.get_all())
69 @staticmethod
70 def __to_bound(stored: ArchivedMessage):
71 return HoleBound(
72 stored.legacy_id, # type:ignore
73 stored.timestamp.replace(tzinfo=timezone.utc),
74 )
76 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]:
77 with self.__store.session() as orm:
78 most_recent = self.__store.get_most_recent_with_legacy_id(orm, self.room.id)
79 if most_recent is None:
80 return None, None
81 if most_recent.source == ArchivedMessageSource.BACKFILL:
82 # most recent = only backfill, fetch everything since last backfill
83 return self.__to_bound(most_recent), None
85 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id(
86 orm, self.room.id, ArchivedMessageSource.BACKFILL
87 )
88 if most_recent_back_filled is None:
89 # group was never back-filled, fetch everything before first live
90 least_recent_live = self.__store.get_first(orm, self.room.id, True)
91 assert least_recent_live is not None
92 return None, self.__to_bound(least_recent_live)
94 assert most_recent_back_filled.legacy_id is not None
95 least_recent_live = self.__store.get_least_recent_with_legacy_id_after(
96 orm, self.room.id, most_recent_back_filled.legacy_id
97 )
98 assert least_recent_live is not None
99 # this is a hole caused by slidge downtime
100 return self.__to_bound(most_recent_back_filled), self.__to_bound(
101 least_recent_live
102 )
104 def get_all(
105 self,
106 start_date: Optional[datetime] = None,
107 end_date: Optional[datetime] = None,
108 before_id: Optional[str] = None,
109 after_id: Optional[str] = None,
110 ids: Collection[str] = (),
111 last_page_n: Optional[int] = None,
112 sender: Optional[str] = None,
113 flip: bool = False,
114 ):
115 with self.__store.session() as orm:
116 for msg in self.__store.get_messages(
117 orm,
118 self.room.id,
119 before_id=before_id,
120 after_id=after_id,
121 ids=ids,
122 last_page_n=last_page_n,
123 sender=sender,
124 start_date=start_date,
125 end_date=end_date,
126 flip=flip,
127 ):
128 yield msg
130 async def send_metadata(self, iq: Iq) -> None:
131 """
132 Send archive extent, as per the spec
134 :param iq:
135 :return:
136 """
137 reply = iq.reply()
138 with self.__store.session() as orm:
139 messages = self.__store.get_first_and_last(orm, self.room.id)
140 if messages:
141 for x, m in [("start", messages[0]), ("end", messages[-1])]:
142 reply["mam_metadata"][x]["id"] = m.id
143 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(
144 tzinfo=timezone.utc
145 )
146 else:
147 reply.enable("mam_metadata")
148 reply.send()
151def archivable(msg: Message) -> bool:
152 """
153 Determine if a message stanza is worth archiving, ie, convey meaningful
154 info
156 :param msg:
157 :return:
158 """
160 if msg.get_plugin("no-store", check=True):
161 return False
163 if msg.get_plugin("no-permanent-store", check=True):
164 return False
166 if msg.get_plugin("store", check=True):
167 return True
169 if msg["body"]:
170 return True
172 if msg.get_plugin("retract", check=True):
173 return True
175 if msg.get_plugin("reactions", check=True):
176 return True
178 if msg.get_plugin("displayed", check=True):
179 return True
181 return False
184log = logging.getLogger(__name__)