Coverage for slidge/util/test.py: 91%

211 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1# type:ignore 

2import tempfile 

3import types 

4from pathlib import Path 

5from typing import Optional, Union 

6from xml.dom.minidom import parseString 

7 

8import xmldiff.main 

9from slixmpp import ( 

10 JID, 

11 ElementBase, 

12 Iq, 

13 MatcherId, 

14 MatchXMLMask, 

15 MatchXPath, 

16 Message, 

17 Presence, 

18 StanzaPath, 

19) 

20from slixmpp.stanza.error import Error 

21from slixmpp.test import SlixTest, TestTransport 

22from slixmpp.xmlstream import highlight, tostring 

23from slixmpp.xmlstream.matcher import MatchIDSender 

24from sqlalchemy import create_engine, delete 

25 

26from slidge import ( 

27 BaseGateway, 

28 BaseSession, 

29 LegacyBookmarks, 

30 LegacyContact, 

31 LegacyMUC, 

32 LegacyParticipant, 

33 LegacyRoster, 

34) 

35 

36from ..command import Command 

37from ..core import config 

38from ..core.config import _TimedeltaSeconds 

39from ..core.pubsub import PepAvatar, PepNick 

40from ..db import SlidgeStore 

41from ..db.avatar import avatar_cache 

42from ..db.meta import Base 

43from ..db.models import Contact 

44 

45 

46class SlixTestPlus(SlixTest): 

47 def setUp(self): 

48 super().setUp() 

49 Error.namespace = "jabber:component:accept" 

50 

51 def check(self, stanza, criteria, method="exact", defaults=None, use_values=True): 

52 """ 

53 Create and compare several stanza objects to a correct XML string. 

54 

55 If use_values is False, tests using stanza.values will not be used. 

56 

57 Some stanzas provide default values for some interfaces, but 

58 these defaults can be problematic for testing since they can easily 

59 be forgotten when supplying the XML string. A list of interfaces that 

60 use defaults may be provided and the generated stanzas will use the 

61 default values for those interfaces if needed. 

62 

63 However, correcting the supplied XML is not possible for interfaces 

64 that add or remove XML elements. Only interfaces that map to XML 

65 attributes may be set using the defaults parameter. The supplied XML 

66 must take into account any extra elements that are included by default. 

67 

68 Arguments: 

69 stanza -- The stanza object to test. 

70 criteria -- An expression the stanza must match against. 

71 method -- The type of matching to use; one of: 

72 'exact', 'mask', 'id', 'xpath', and 'stanzapath'. 

73 Defaults to the value of self.match_method. 

74 defaults -- A list of stanza interfaces that have default 

75 values. These interfaces will be set to their 

76 defaults for the given and generated stanzas to 

77 prevent unexpected test failures. 

78 use_values -- Indicates if testing using stanza.values should 

79 be used. Defaults to True. 

80 """ 

81 if method is None and hasattr(self, "match_method"): 

82 method = getattr(self, "match_method") 

83 

84 if method != "exact": 

85 matchers = { 

86 "stanzapath": StanzaPath, 

87 "xpath": MatchXPath, 

88 "mask": MatchXMLMask, 

89 "idsender": MatchIDSender, 

90 "id": MatcherId, 

91 } 

92 Matcher = matchers.get(method, None) 

93 if Matcher is None: 

94 raise ValueError("Unknown matching method.") 

95 test = Matcher(criteria) 

96 self.assertTrue( 

97 test.match(stanza), 

98 "Stanza did not match using %s method:\n" % method 

99 + "Criteria:\n%s\n" % str(criteria) 

100 + "Stanza:\n%s" % str(stanza), 

101 ) 

102 else: 

103 stanza_class = stanza.__class__ 

104 # Hack to preserve namespaces instead of having jabber:client 

105 # everywhere. 

106 old_ns = stanza_class.namespace 

107 stanza_class.namespace = stanza.namespace 

108 if not isinstance(criteria, ElementBase): 

109 xml = self.parse_xml(criteria) 

110 else: 

111 xml = criteria.xml 

112 

113 # Ensure that top level namespaces are used, even if they 

114 # were not provided. 

115 self.fix_namespaces(stanza.xml) 

116 self.fix_namespaces(xml) 

117 

118 stanza2 = stanza_class(xml=xml) 

119 

120 if use_values: 

121 # Using stanza.values will add XML for any interface that 

122 # has a default value. We need to set those defaults on 

123 # the existing stanzas and XML so that they will compare 

124 # correctly. 

125 default_stanza = stanza_class() 

126 if defaults is None: 

127 known_defaults = {Message: ["type"], Presence: ["priority"]} 

128 defaults = known_defaults.get(stanza_class, []) 

129 for interface in defaults: 

130 stanza[interface] = stanza[interface] 

131 stanza2[interface] = stanza2[interface] 

132 # Can really only automatically add defaults for top 

133 # level attribute values. Anything else must be accounted 

134 # for in the provided XML string. 

135 if interface not in xml.attrib: 

136 if interface in default_stanza.xml.attrib: 

137 value = default_stanza.xml.attrib[interface] 

138 xml.attrib[interface] = value 

139 

140 values = stanza2.values 

141 stanza3 = stanza_class() 

142 stanza3.values = values 

143 

144 debug = "Three methods for creating stanzas do not match.\n" 

145 debug += "Given XML:\n%s\n" % highlight(tostring(xml)) 

146 debug += "Given stanza:\n%s\n" % format_stanza(stanza) 

147 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml)) 

148 debug += "Second generated stanza:\n%s\n" % highlight( 

149 tostring(stanza3.xml) 

150 ) 

151 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) 

152 else: 

153 debug = "Two methods for creating stanzas do not match.\n" 

154 debug += "Given XML:\n%s\n" % highlight(tostring(xml)) 

155 debug += "Given stanza:\n%s\n" % format_stanza(stanza) 

156 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml)) 

157 result = self.compare(xml, stanza.xml, stanza2.xml) 

158 stanza_class.namespace = old_ns 

159 

160 if not result: 

161 debug += str( 

162 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml)) 

163 ) 

164 if use_values: 

165 debug += str( 

166 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml)) 

167 ) 

168 self.assertTrue(result, debug) 

169 

170 def next_sent(self, timeout=0.05) -> Optional[Union[Message, Iq, Presence]]: 

171 self.wait_for_send_queue() 

172 sent = self.xmpp.socket.next_sent(timeout=timeout) 

173 if sent is None: 

174 return None 

175 xml = self.parse_xml(sent) 

176 self.fix_namespaces(xml, "jabber:component:accept") 

177 sent = self.xmpp._build_stanza(xml, "jabber:component:accept") 

178 return sent 

179 

180 

181class SlidgeTest(SlixTestPlus): 

182 plugin: Union[types.ModuleType, dict] 

183 

184 class Config: 

185 jid = "aim.shakespeare.lit" 

186 secret = "test" 

187 server = "shakespeare.lit" 

188 port = 5222 

189 upload_service = "upload.test" 

190 home_dir = Path(tempfile.mkdtemp()) 

191 user_jid_validator = ".*" 

192 admins: list[str] = [] 

193 no_roster_push = False 

194 upload_requester = None 

195 ignore_delay_threshold = _TimedeltaSeconds("300") 

196 last_seen_fallback = True 

197 

198 @classmethod 

199 def setUpClass(cls): 

200 for k, v in vars(cls.Config).items(): 

201 setattr(config, k.upper(), v) 

202 

203 def setUp(self): 

204 if hasattr(self, "plugin"): 

205 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway) 

206 BaseSession._subclass = find_subclass(self.plugin, BaseSession) 

207 LegacyRoster._subclass = find_subclass( 

208 self.plugin, LegacyRoster, base_ok=True 

209 ) 

210 LegacyContact._subclass = find_subclass( 

211 self.plugin, LegacyContact, base_ok=True 

212 ) 

213 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True) 

214 LegacyBookmarks._subclass = find_subclass( 

215 self.plugin, LegacyBookmarks, base_ok=True 

216 ) 

217 

218 # workaround for duplicate output of sql alchemy's log, cf 

219 # https://stackoverflow.com/a/76498428/5902284 

220 from sqlalchemy import log as sqlalchemy_log 

221 

222 sqlalchemy_log._add_default_handler = lambda x: None 

223 

224 engine = self.db_engine = create_engine("sqlite+pysqlite:///:memory:") 

225 Base.metadata.create_all(engine) 

226 BaseGateway.store = SlidgeStore(engine) 

227 BaseGateway._test_mode = True 

228 try: 

229 self.xmpp = BaseGateway.get_self_or_unique_subclass()() 

230 except Exception: 

231 raise 

232 self.xmpp.TEST_MODE = True 

233 PepNick.contact_store = self.xmpp.store.contacts 

234 PepAvatar.store = self.xmpp.store 

235 avatar_cache.store = self.xmpp.store.avatars 

236 avatar_cache.set_dir(Path(tempfile.mkdtemp())) 

237 self.xmpp._always_send_everything = True 

238 engine.echo = True 

239 

240 self.xmpp.connection_made(TestTransport(self.xmpp)) 

241 self.xmpp.session_bind_event.set() 

242 # Remove unique ID prefix to make it easier to test 

243 self.xmpp._id_prefix = "" 

244 self.xmpp.default_lang = None 

245 self.xmpp.peer_default_lang = None 

246 

247 def new_id(): 

248 self.xmpp._id += 1 

249 return str(self.xmpp._id) 

250 

251 self.xmpp._id = 0 

252 self.xmpp.new_id = new_id 

253 

254 # Must have the stream header ready for xmpp.process() to work. 

255 header = self.xmpp.stream_header 

256 

257 self.xmpp.data_received(header) 

258 self.wait_for_send_queue() 

259 

260 self.xmpp.socket.next_sent() 

261 self.xmpp.socket.next_sent() 

262 

263 # Some plugins require messages to have ID values. Set 

264 # this to True in tests related to those plugins. 

265 self.xmpp.use_message_ids = False 

266 self.xmpp.use_presence_ids = False 

267 Error.namespace = "jabber:component:accept" 

268 

269 def tearDown(self): 

270 self.db_engine.echo = False 

271 super().tearDown() 

272 import slidge.db.store 

273 

274 if slidge.db.store._session is not None: 

275 slidge.db.store._session.commit() 

276 slidge.db.store._session = None 

277 Base.metadata.drop_all(self.xmpp.store._engine) 

278 

279 def setup_logged_session(self, n_contacts=0): 

280 user = self.xmpp.store.users.new( 

281 JID("romeo@montague.lit/gajim"), {"username": "romeo", "city": ""} 

282 ) 

283 user.preferences = {"sync_avatar": True, "sync_presence": True} 

284 self.xmpp.store.users.update(user) 

285 

286 with self.xmpp.store.session() as session: 

287 session.execute(delete(Contact)) 

288 session.commit() 

289 

290 self.run_coro( 

291 self.xmpp._BaseGateway__dispatcher._on_user_register( 

292 Iq(sfrom="romeo@montague.lit/gajim") 

293 ) 

294 ) 

295 welcome = self.next_sent() 

296 assert welcome["body"], welcome 

297 stanza = self.next_sent() 

298 assert "logging in" in stanza["status"].lower(), stanza 

299 stanza = self.next_sent() 

300 assert "syncing contacts" in stanza["status"].lower(), stanza 

301 if BaseGateway.get_self_or_unique_subclass().GROUPS: 

302 stanza = self.next_sent() 

303 assert "syncing groups" in stanza["status"].lower(), stanza 

304 for _ in range(n_contacts): 

305 probe = self.next_sent() 

306 assert probe.get_type() == "probe" 

307 stanza = self.next_sent() 

308 assert "yup" in stanza["status"].lower(), stanza 

309 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid( 

310 JID("romeo@montague.lit") 

311 ) 

312 

313 self.juliet: LegacyContact = self.run_coro( 

314 self.romeo.contacts.by_legacy_id("juliet") 

315 ) 

316 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room")) 

317 self.first_witch: LegacyParticipant = self.run_coro( 

318 self.room.get_participant("firstwitch") 

319 ) 

320 self.send( # language=XML 

321 """ 

322 <iq type="get" 

323 to="romeo@montague.lit" 

324 id="1" 

325 from="aim.shakespeare.lit"> 

326 <pubsub xmlns="http://jabber.org/protocol/pubsub"> 

327 <items node="urn:xmpp:avatar:metadata" /> 

328 </pubsub> 

329 </iq> 

330 """ 

331 ) 

332 

333 @classmethod 

334 def tearDownClass(cls): 

335 reset_subclasses() 

336 

337 

338def format_stanza(stanza): 

339 return highlight( 

340 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:]) 

341 ) 

342 

343 

344def find_subclass(o, parent, base_ok=False): 

345 try: 

346 vals = vars(o).values() 

347 except TypeError: 

348 vals = o.values() 

349 for x in vals: 

350 try: 

351 if issubclass(x, parent) and x is not parent: 

352 return x 

353 except TypeError: 

354 pass 

355 else: 

356 if base_ok: 

357 return parent 

358 else: 

359 raise RuntimeError 

360 

361 

362def reset_subclasses(): 

363 """ 

364 Reset registered subclasses between test classes. 

365 

366 Needed because these classes are meant to only be subclassed once and raise 

367 exceptions otherwise. 

368 """ 

369 BaseSession.reset_subclass() 

370 BaseGateway.reset_subclass() 

371 LegacyRoster.reset_subclass() 

372 LegacyContact.reset_subclass() 

373 LegacyMUC.reset_subclass() 

374 LegacyBookmarks.reset_subclass() 

375 LegacyParticipant.reset_subclass() 

376 # reset_commands() 

377 

378 

379def reset_commands(): 

380 Command.subclasses = [ 

381 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core") 

382 ]