Coverage for slidge/group/archive.py: 96%
73 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import logging
2import uuid
3from copy import copy
4from datetime import datetime, timezone
5from typing import TYPE_CHECKING, Collection, Optional
7from slixmpp import Iq, Message
9from ..db.models import ArchivedMessage, ArchivedMessageSource
10from ..db.store import MAMStore
11from ..util.archive_msg import HistoryMessage
12from ..util.types import HoleBound
14if TYPE_CHECKING:
15 from .participant import LegacyParticipant
18class MessageArchive:
19 def __init__(self, room_pk: int, store: MAMStore):
20 self.room_pk = room_pk
21 self.__store = store
23 def add(
24 self,
25 msg: Message,
26 participant: Optional["LegacyParticipant"] = None,
27 archive_only=False,
28 legacy_msg_id=None,
29 ):
30 """
31 Add a message to the archive if it is deemed archivable
33 :param msg:
34 :param participant:
35 :param archive_only:
36 :param legacy_msg_id:
37 """
38 if not archivable(msg):
39 return
40 new_msg = copy(msg)
41 if participant and not participant.muc.is_anonymous:
42 new_msg["muc"]["role"] = participant.role
43 new_msg["muc"]["affiliation"] = participant.affiliation
44 if participant.contact:
45 new_msg["muc"]["jid"] = participant.contact.jid.bare
46 elif participant.is_user:
47 new_msg["muc"]["jid"] = participant.user_jid.bare
48 elif participant.is_system:
49 new_msg["muc"]["jid"] = participant.muc.jid
50 else:
51 log.warning("No real JID for participant in this group")
52 new_msg["muc"][
53 "jid"
54 ] = f"{uuid.uuid4()}@{participant.xmpp.boundjid.bare}"
56 self.__store.add_message(
57 self.room_pk,
58 HistoryMessage(new_msg),
59 archive_only,
60 None if legacy_msg_id is None else str(legacy_msg_id),
61 )
63 def __iter__(self):
64 return iter(self.get_all())
66 @staticmethod
67 def __to_bound(stored: ArchivedMessage):
68 return HoleBound(
69 stored.legacy_id, # type:ignore
70 stored.timestamp.replace(tzinfo=timezone.utc),
71 )
73 def get_hole_bounds(self) -> tuple[HoleBound | None, HoleBound | None]:
74 most_recent = self.__store.get_most_recent_with_legacy_id(self.room_pk)
75 if most_recent is None:
76 return None, None
77 if most_recent.source == ArchivedMessageSource.BACKFILL:
78 # most recent = only backfill, fetch everything since last backfill
79 return self.__to_bound(most_recent), None
81 most_recent_back_filled = self.__store.get_most_recent_with_legacy_id(
82 self.room_pk, ArchivedMessageSource.BACKFILL
83 )
84 if most_recent_back_filled is None:
85 # group was never back-filled, fetch everything before first live
86 least_recent_live = self.__store.get_first(self.room_pk, True)
87 assert least_recent_live is not None
88 return None, self.__to_bound(least_recent_live)
90 assert most_recent_back_filled.legacy_id is not None
91 least_recent_live = self.__store.get_least_recent_with_legacy_id_after(
92 self.room_pk, most_recent_back_filled.legacy_id
93 )
94 assert least_recent_live is not None
95 # this is a hole caused by slidge downtime
96 return self.__to_bound(most_recent_back_filled), self.__to_bound(
97 least_recent_live
98 )
100 def get_all(
101 self,
102 start_date: Optional[datetime] = None,
103 end_date: Optional[datetime] = None,
104 before_id: Optional[str] = None,
105 after_id: Optional[str] = None,
106 ids: Collection[str] = (),
107 last_page_n: Optional[int] = None,
108 sender: Optional[str] = None,
109 flip=False,
110 ):
111 for msg in self.__store.get_messages(
112 self.room_pk,
113 before_id=before_id,
114 after_id=after_id,
115 ids=ids,
116 last_page_n=last_page_n,
117 sender=sender,
118 start_date=start_date,
119 end_date=end_date,
120 flip=flip,
121 ):
122 yield msg
124 async def send_metadata(self, iq: Iq):
125 """
126 Send archive extent, as per the spec
128 :param iq:
129 :return:
130 """
131 reply = iq.reply()
132 messages = self.__store.get_first_and_last(self.room_pk)
133 if messages:
134 for x, m in [("start", messages[0]), ("end", messages[-1])]:
135 reply["mam_metadata"][x]["id"] = m.id
136 reply["mam_metadata"][x]["timestamp"] = m.sent_on.replace(
137 tzinfo=timezone.utc
138 )
139 else:
140 reply.enable("mam_metadata")
141 reply.send()
144def archivable(msg: Message):
145 """
146 Determine if a message stanza is worth archiving, ie, convey meaningful
147 info
149 :param msg:
150 :return:
151 """
153 if msg.get_plugin("hint", check=True) and msg["hint"] == "no-store":
154 return False
156 if msg["body"]:
157 return True
159 if msg.get_plugin("apply_to", check=True):
160 # retractions
161 return True
163 if msg.get_plugin("reactions", check=True):
164 return True
166 return False
169log = logging.getLogger(__name__)