Coverage for slidge/slixfix/xep_0313/mam.py: 28%
130 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1# Slixmpp: The Slick XMPP Library
2# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
3# This file is part of Slixmpp.
4# See the file LICENSE for copying permission
5import logging
6from asyncio import Future
7from collections.abc import AsyncGenerator
8from datetime import datetime
9from typing import Any, Awaitable, Callable, Dict, Optional, Tuple
11from slixmpp import JID
12from slixmpp.plugins import BasePlugin
13from slixmpp.plugins.xep_0004.stanza import Form
14from slixmpp.stanza import Iq, Message
15from slixmpp.xmlstream import register_stanza_plugin
16from slixmpp.xmlstream.handler import Collector
17from slixmpp.xmlstream.matcher import MatchXMLMask
19from . import stanza
21log = logging.getLogger(__name__)
24class XEP_0313(BasePlugin):
25 """
26 XEP-0313 Message Archive Management
27 """
29 name = "xep_0313"
30 description = "XEP-0313: Message Archive Management"
31 dependencies = {"xep_0004", "xep_0030", "xep_0050", "xep_0059", "xep_0297"}
32 stanza = stanza
34 def plugin_init(self):
35 register_stanza_plugin(stanza.MAM, Form)
36 register_stanza_plugin(Iq, stanza.MAM)
37 register_stanza_plugin(Message, stanza.Result)
38 register_stanza_plugin(Iq, stanza.Fin)
39 register_stanza_plugin(stanza.Result, self.xmpp["xep_0297"].stanza.Forwarded)
40 register_stanza_plugin(stanza.MAM, self.xmpp["xep_0059"].stanza.Set)
41 register_stanza_plugin(stanza.Fin, self.xmpp["xep_0059"].stanza.Set)
42 register_stanza_plugin(Iq, stanza.Metadata)
43 register_stanza_plugin(stanza.Metadata, stanza.Start)
44 register_stanza_plugin(stanza.Metadata, stanza.End)
46 def retrieve(
47 self,
48 jid: Optional[JID] = None,
49 start: Optional[datetime] = None,
50 end: Optional[datetime] = None,
51 with_jid: Optional[JID] = None,
52 ifrom: Optional[JID] = None,
53 reverse: bool = False,
54 timeout: int = None,
55 callback: Callable[[Iq], None] = None,
56 iterator: bool = False,
57 rsm: Optional[Dict[str, Any]] = None,
58 ) -> Awaitable:
59 """
60 Send a MAM query and retrieve the results.
62 :param JID jid: Entity holding the MAM records
63 :param datetime start,end: MAM query temporal boundaries
64 :param JID with_jid: Filter results on this JID
65 :param JID ifrom: To change the from address of the query
66 :param bool reverse: Get the results in reverse order
67 :param int timeout: IQ timeout
68 :param func callback: Custom callback for handling results
69 :param bool iterator: Use RSM and iterate over a paginated query
70 :param dict rsm: RSM custom options
71 """
72 iq, stanza_mask = self._pre_mam_retrieve(jid, start, end, with_jid, ifrom)
73 query_id = iq["id"]
74 amount = 10
75 if rsm:
76 for key, value in rsm.items():
77 iq["mam"]["rsm"][key] = str(value)
78 if key == "max":
79 amount = value
80 cb_data = {}
82 xml_mask = str(stanza_mask)
84 def pre_cb(query: Iq) -> None:
85 stanza_mask["mam_result"]["queryid"] = query["id"]
86 xml_mask = str(stanza_mask)
87 query["mam"]["queryid"] = query["id"]
88 collector = Collector("MAM_Results_%s" % query_id, MatchXMLMask(xml_mask))
89 self.xmpp.register_handler(collector)
90 cb_data["collector"] = collector
92 def post_cb(result: Iq) -> None:
93 results = cb_data["collector"].stop()
94 if result["type"] == "result":
95 result["mam"]["results"] = results
96 result["mam_fin"]["results"] = results
98 if iterator:
99 return self.xmpp["xep_0059"].iterate(
100 iq,
101 "mam",
102 "results",
103 amount=amount,
104 reverse=reverse,
105 recv_interface="mam_fin",
106 pre_cb=pre_cb,
107 post_cb=post_cb,
108 )
110 collector = Collector("MAM_Results_%s" % query_id, MatchXMLMask(xml_mask))
111 self.xmpp.register_handler(collector)
113 def wrapped_cb(iq: Iq) -> None:
114 results = collector.stop()
115 if iq["type"] == "result":
116 iq["mam"]["results"] = results
117 if callback:
118 callback(iq)
120 return iq.send(timeout=timeout, callback=wrapped_cb)
122 async def iterate(
123 self,
124 jid: Optional[JID] = None,
125 start: Optional[datetime] = None,
126 end: Optional[datetime] = None,
127 with_jid: Optional[JID] = None,
128 ifrom: Optional[JID] = None,
129 reverse: bool = False,
130 rsm: Optional[Dict[str, Any]] = None,
131 total: Optional[int] = None,
132 ) -> AsyncGenerator:
133 """
134 Iterate over each message of MAM query.
136 .. versionadded:: 1.8.0
138 :param jid: Entity holding the MAM records
139 :param start: MAM query start time
140 :param end: MAM query end time
141 :param with_jid: Filter results on this JID
142 :param ifrom: To change the from address of the query
143 :param reverse: Get the results in reverse order
144 :param rsm: RSM custom options
145 :param total: A number of messages received after which the query
146 should stop.
147 """
148 iq, stanza_mask = self._pre_mam_retrieve(jid, start, end, with_jid, ifrom)
149 query_id = iq["id"]
150 amount = 10
152 if rsm:
153 for key, value in rsm.items():
154 iq["mam"]["rsm"][key] = str(value)
155 if key == "max":
156 amount = value
157 cb_data = {}
159 def pre_cb(query: Iq) -> None:
160 stanza_mask["mam_result"]["queryid"] = query["id"]
161 xml_mask = str(stanza_mask)
162 query["mam"]["queryid"] = query["id"]
163 collector = Collector("MAM_Results_%s" % query_id, MatchXMLMask(xml_mask))
164 self.xmpp.register_handler(collector)
165 cb_data["collector"] = collector
167 def post_cb(result: Iq) -> None:
168 results = cb_data["collector"].stop()
169 if result["type"] == "result":
170 result["mam"]["results"] = results
171 result["mam_fin"]["results"] = results
173 iterator = self.xmpp["xep_0059"].iterate(
174 iq,
175 "mam",
176 "results",
177 amount=amount,
178 reverse=reverse,
179 recv_interface="mam_fin",
180 pre_cb=pre_cb,
181 post_cb=post_cb,
182 )
183 recv_count = 0
185 async for page in iterator:
186 messages = [message for message in page["mam"]["results"]]
187 if reverse:
188 messages.reverse()
189 for message in messages:
190 yield message
191 recv_count += 1
192 if total is not None and recv_count >= total:
193 break
194 if total is not None and recv_count >= total:
195 break
197 def _pre_mam_retrieve(
198 self,
199 jid: Optional[JID] = None,
200 start: Optional[datetime] = None,
201 end: Optional[datetime] = None,
202 with_jid: Optional[JID] = None,
203 ifrom: Optional[JID] = None,
204 ) -> Tuple[Iq, Message]:
205 """Build the IQ and stanza mask for MAM results"""
206 iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom)
207 query_id = iq["id"]
208 iq["mam"]["queryid"] = query_id
209 iq["mam"]["start"] = start
210 iq["mam"]["end"] = end
211 iq["mam"]["with"] = with_jid
213 stanza_mask = self.xmpp.Message()
215 auto_origin = stanza_mask.xml.find("{urn:xmpp:sid:0}origin-id")
216 if auto_origin is not None:
217 stanza_mask.xml.remove(auto_origin)
218 del stanza_mask["id"]
219 del stanza_mask["lang"]
220 stanza_mask["from"] = jid
221 stanza_mask["mam_result"]["queryid"] = query_id
223 return (iq, stanza_mask)
225 async def get_fields(self, jid: Optional[JID] = None, **iqkwargs) -> Form:
226 """Get MAM query fields.
228 .. versionadded:: 1.8.0
230 :param jid: JID to retrieve the policy from.
231 :return: The Form of allowed options
232 """
233 ifrom = iqkwargs.pop("ifrom", None)
234 iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
235 iq.enable("mam")
236 result = await iq.send(**iqkwargs)
237 return result["mam"]["form"]
239 async def get_configuration_commands(
240 self, jid: Optional[JID], **discokwargs
241 ) -> Future:
242 """Get the list of MAM advanced configuration commands.
244 .. versionchanged:: 1.8.0
246 :param jid: JID to get the commands from.
247 """
248 if jid is None:
249 jid = self.xmpp.boundjid.bare
250 return await self.xmpp["xep_0030"].get_items(
251 jid=jid, node="urn:xmpp:mam#configure", **discokwargs
252 )
254 def get_archive_metadata(self, jid: Optional[JID] = None, **iqkwargs) -> Future:
255 """Get the archive metadata from a JID.
257 :param jid: JID to get the metadata from.
258 """
259 ifrom = iqkwargs.pop("ifrom", None)
260 iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
261 iq.enable("mam_metadata")
262 return iq.send(**iqkwargs)