Coverage for slidge/slixfix/xep_0313/mam.py: 28%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1# Slixmpp: The Slick XMPP Library 

2# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout 

3# This file is part of Slixmpp. 

4# See the file LICENSE for copying permission 

5import logging 

6from asyncio import Future 

7from collections.abc import AsyncGenerator 

8from datetime import datetime 

9from typing import Any, Awaitable, Callable, Dict, Optional, Tuple 

10 

11from slixmpp import JID 

12from slixmpp.plugins import BasePlugin 

13from slixmpp.plugins.xep_0004.stanza import Form 

14from slixmpp.stanza import Iq, Message 

15from slixmpp.xmlstream import register_stanza_plugin 

16from slixmpp.xmlstream.handler import Collector 

17from slixmpp.xmlstream.matcher import MatchXMLMask 

18 

19from . import stanza 

20 

21log = logging.getLogger(__name__) 

22 

23 

24class XEP_0313(BasePlugin): 

25 """ 

26 XEP-0313 Message Archive Management 

27 """ 

28 

29 name = "xep_0313" 

30 description = "XEP-0313: Message Archive Management" 

31 dependencies = {"xep_0004", "xep_0030", "xep_0050", "xep_0059", "xep_0297"} 

32 stanza = stanza 

33 

34 def plugin_init(self): 

35 register_stanza_plugin(stanza.MAM, Form) 

36 register_stanza_plugin(Iq, stanza.MAM) 

37 register_stanza_plugin(Message, stanza.Result) 

38 register_stanza_plugin(Iq, stanza.Fin) 

39 register_stanza_plugin(stanza.Result, self.xmpp["xep_0297"].stanza.Forwarded) 

40 register_stanza_plugin(stanza.MAM, self.xmpp["xep_0059"].stanza.Set) 

41 register_stanza_plugin(stanza.Fin, self.xmpp["xep_0059"].stanza.Set) 

42 register_stanza_plugin(Iq, stanza.Metadata) 

43 register_stanza_plugin(stanza.Metadata, stanza.Start) 

44 register_stanza_plugin(stanza.Metadata, stanza.End) 

45 

46 def retrieve( 

47 self, 

48 jid: Optional[JID] = None, 

49 start: Optional[datetime] = None, 

50 end: Optional[datetime] = None, 

51 with_jid: Optional[JID] = None, 

52 ifrom: Optional[JID] = None, 

53 reverse: bool = False, 

54 timeout: int = None, 

55 callback: Callable[[Iq], None] = None, 

56 iterator: bool = False, 

57 rsm: Optional[Dict[str, Any]] = None, 

58 ) -> Awaitable: 

59 """ 

60 Send a MAM query and retrieve the results. 

61 

62 :param JID jid: Entity holding the MAM records 

63 :param datetime start,end: MAM query temporal boundaries 

64 :param JID with_jid: Filter results on this JID 

65 :param JID ifrom: To change the from address of the query 

66 :param bool reverse: Get the results in reverse order 

67 :param int timeout: IQ timeout 

68 :param func callback: Custom callback for handling results 

69 :param bool iterator: Use RSM and iterate over a paginated query 

70 :param dict rsm: RSM custom options 

71 """ 

72 iq, stanza_mask = self._pre_mam_retrieve(jid, start, end, with_jid, ifrom) 

73 query_id = iq["id"] 

74 amount = 10 

75 if rsm: 

76 for key, value in rsm.items(): 

77 iq["mam"]["rsm"][key] = str(value) 

78 if key == "max": 

79 amount = value 

80 cb_data = {} 

81 

82 xml_mask = str(stanza_mask) 

83 

84 def pre_cb(query: Iq) -> None: 

85 stanza_mask["mam_result"]["queryid"] = query["id"] 

86 xml_mask = str(stanza_mask) 

87 query["mam"]["queryid"] = query["id"] 

88 collector = Collector("MAM_Results_%s" % query_id, MatchXMLMask(xml_mask)) 

89 self.xmpp.register_handler(collector) 

90 cb_data["collector"] = collector 

91 

92 def post_cb(result: Iq) -> None: 

93 results = cb_data["collector"].stop() 

94 if result["type"] == "result": 

95 result["mam"]["results"] = results 

96 result["mam_fin"]["results"] = results 

97 

98 if iterator: 

99 return self.xmpp["xep_0059"].iterate( 

100 iq, 

101 "mam", 

102 "results", 

103 amount=amount, 

104 reverse=reverse, 

105 recv_interface="mam_fin", 

106 pre_cb=pre_cb, 

107 post_cb=post_cb, 

108 ) 

109 

110 collector = Collector("MAM_Results_%s" % query_id, MatchXMLMask(xml_mask)) 

111 self.xmpp.register_handler(collector) 

112 

113 def wrapped_cb(iq: Iq) -> None: 

114 results = collector.stop() 

115 if iq["type"] == "result": 

116 iq["mam"]["results"] = results 

117 if callback: 

118 callback(iq) 

119 

120 return iq.send(timeout=timeout, callback=wrapped_cb) 

121 

122 async def iterate( 

123 self, 

124 jid: Optional[JID] = None, 

125 start: Optional[datetime] = None, 

126 end: Optional[datetime] = None, 

127 with_jid: Optional[JID] = None, 

128 ifrom: Optional[JID] = None, 

129 reverse: bool = False, 

130 rsm: Optional[Dict[str, Any]] = None, 

131 total: Optional[int] = None, 

132 ) -> AsyncGenerator: 

133 """ 

134 Iterate over each message of MAM query. 

135 

136 .. versionadded:: 1.8.0 

137 

138 :param jid: Entity holding the MAM records 

139 :param start: MAM query start time 

140 :param end: MAM query end time 

141 :param with_jid: Filter results on this JID 

142 :param ifrom: To change the from address of the query 

143 :param reverse: Get the results in reverse order 

144 :param rsm: RSM custom options 

145 :param total: A number of messages received after which the query 

146 should stop. 

147 """ 

148 iq, stanza_mask = self._pre_mam_retrieve(jid, start, end, with_jid, ifrom) 

149 query_id = iq["id"] 

150 amount = 10 

151 

152 if rsm: 

153 for key, value in rsm.items(): 

154 iq["mam"]["rsm"][key] = str(value) 

155 if key == "max": 

156 amount = value 

157 cb_data = {} 

158 

159 def pre_cb(query: Iq) -> None: 

160 stanza_mask["mam_result"]["queryid"] = query["id"] 

161 xml_mask = str(stanza_mask) 

162 query["mam"]["queryid"] = query["id"] 

163 collector = Collector("MAM_Results_%s" % query_id, MatchXMLMask(xml_mask)) 

164 self.xmpp.register_handler(collector) 

165 cb_data["collector"] = collector 

166 

167 def post_cb(result: Iq) -> None: 

168 results = cb_data["collector"].stop() 

169 if result["type"] == "result": 

170 result["mam"]["results"] = results 

171 result["mam_fin"]["results"] = results 

172 

173 iterator = self.xmpp["xep_0059"].iterate( 

174 iq, 

175 "mam", 

176 "results", 

177 amount=amount, 

178 reverse=reverse, 

179 recv_interface="mam_fin", 

180 pre_cb=pre_cb, 

181 post_cb=post_cb, 

182 ) 

183 recv_count = 0 

184 

185 async for page in iterator: 

186 messages = [message for message in page["mam"]["results"]] 

187 if reverse: 

188 messages.reverse() 

189 for message in messages: 

190 yield message 

191 recv_count += 1 

192 if total is not None and recv_count >= total: 

193 break 

194 if total is not None and recv_count >= total: 

195 break 

196 

197 def _pre_mam_retrieve( 

198 self, 

199 jid: Optional[JID] = None, 

200 start: Optional[datetime] = None, 

201 end: Optional[datetime] = None, 

202 with_jid: Optional[JID] = None, 

203 ifrom: Optional[JID] = None, 

204 ) -> Tuple[Iq, Message]: 

205 """Build the IQ and stanza mask for MAM results""" 

206 iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom) 

207 query_id = iq["id"] 

208 iq["mam"]["queryid"] = query_id 

209 iq["mam"]["start"] = start 

210 iq["mam"]["end"] = end 

211 iq["mam"]["with"] = with_jid 

212 

213 stanza_mask = self.xmpp.Message() 

214 

215 auto_origin = stanza_mask.xml.find("{urn:xmpp:sid:0}origin-id") 

216 if auto_origin is not None: 

217 stanza_mask.xml.remove(auto_origin) 

218 del stanza_mask["id"] 

219 del stanza_mask["lang"] 

220 stanza_mask["from"] = jid 

221 stanza_mask["mam_result"]["queryid"] = query_id 

222 

223 return (iq, stanza_mask) 

224 

225 async def get_fields(self, jid: Optional[JID] = None, **iqkwargs) -> Form: 

226 """Get MAM query fields. 

227 

228 .. versionadded:: 1.8.0 

229 

230 :param jid: JID to retrieve the policy from. 

231 :return: The Form of allowed options 

232 """ 

233 ifrom = iqkwargs.pop("ifrom", None) 

234 iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) 

235 iq.enable("mam") 

236 result = await iq.send(**iqkwargs) 

237 return result["mam"]["form"] 

238 

239 async def get_configuration_commands( 

240 self, jid: Optional[JID], **discokwargs 

241 ) -> Future: 

242 """Get the list of MAM advanced configuration commands. 

243 

244 .. versionchanged:: 1.8.0 

245 

246 :param jid: JID to get the commands from. 

247 """ 

248 if jid is None: 

249 jid = self.xmpp.boundjid.bare 

250 return await self.xmpp["xep_0030"].get_items( 

251 jid=jid, node="urn:xmpp:mam#configure", **discokwargs 

252 ) 

253 

254 def get_archive_metadata(self, jid: Optional[JID] = None, **iqkwargs) -> Future: 

255 """Get the archive metadata from a JID. 

256 

257 :param jid: JID to get the metadata from. 

258 """ 

259 ifrom = iqkwargs.pop("ifrom", None) 

260 iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom) 

261 iq.enable("mam_metadata") 

262 return iq.send(**iqkwargs)