Coverage for slidge/util/test.py: 89%

207 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1# type:ignore 

2import tempfile 

3import types 

4from pathlib import Path 

5from typing import Optional, Union 

6from xml.dom.minidom import parseString 

7 

8try: 

9 import xmldiff.main 

10 

11 XML_DIFF_PRESENT = True 

12except ImportError: 

13 xmldiff = None 

14 XML_DIFF_PRESENT = False 

15 

16from slixmpp import ( 

17 JID, 

18 ElementBase, 

19 Iq, 

20 MatcherId, 

21 MatchXMLMask, 

22 MatchXPath, 

23 Message, 

24 Presence, 

25 StanzaPath, 

26) 

27from slixmpp.stanza.error import Error 

28from slixmpp.test import SlixTest, TestTransport 

29from slixmpp.xmlstream import highlight, tostring 

30from slixmpp.xmlstream.matcher import MatchIDSender 

31from sqlalchemy import create_engine, delete 

32 

33from slidge import ( 

34 BaseGateway, 

35 BaseSession, 

36 LegacyBookmarks, 

37 LegacyContact, 

38 LegacyMUC, 

39 LegacyParticipant, 

40 LegacyRoster, 

41) 

42 

43from ..command import Command 

44from ..core import config 

45from ..core.config import _TimedeltaSeconds 

46from ..db import SlidgeStore 

47from ..db.avatar import avatar_cache 

48from ..db.meta import Base 

49from ..db.models import Contact, GatewayUser 

50 

51 

52class SlixTestPlus(SlixTest): 

53 def setUp(self) -> None: 

54 super().setUp() 

55 Error.namespace = "jabber:component:accept" 

56 

57 def check( 

58 self, 

59 stanza, 

60 criteria, 

61 method: str = "exact", 

62 defaults=None, 

63 use_values: bool = True, 

64 ): 

65 """ 

66 Create and compare several stanza objects to a correct XML string. 

67 

68 If use_values is False, tests using stanza.values will not be used. 

69 

70 Some stanzas provide default values for some interfaces, but 

71 these defaults can be problematic for testing since they can easily 

72 be forgotten when supplying the XML string. A list of interfaces that 

73 use defaults may be provided and the generated stanzas will use the 

74 default values for those interfaces if needed. 

75 

76 However, correcting the supplied XML is not possible for interfaces 

77 that add or remove XML elements. Only interfaces that map to XML 

78 attributes may be set using the defaults parameter. The supplied XML 

79 must take into account any extra elements that are included by default. 

80 

81 Arguments: 

82 stanza -- The stanza object to test. 

83 criteria -- An expression the stanza must match against. 

84 method -- The type of matching to use; one of: 

85 'exact', 'mask', 'id', 'xpath', and 'stanzapath'. 

86 Defaults to the value of self.match_method. 

87 defaults -- A list of stanza interfaces that have default 

88 values. These interfaces will be set to their 

89 defaults for the given and generated stanzas to 

90 prevent unexpected test failures. 

91 use_values -- Indicates if testing using stanza.values should 

92 be used. Defaults to True. 

93 """ 

94 if method is None and hasattr(self, "match_method"): 

95 method = getattr(self, "match_method") 

96 

97 if method != "exact": 

98 matchers = { 

99 "stanzapath": StanzaPath, 

100 "xpath": MatchXPath, 

101 "mask": MatchXMLMask, 

102 "idsender": MatchIDSender, 

103 "id": MatcherId, 

104 } 

105 Matcher = matchers.get(method, None) 

106 if Matcher is None: 

107 raise ValueError("Unknown matching method.") 

108 test = Matcher(criteria) 

109 self.assertTrue( 

110 test.match(stanza), 

111 "Stanza did not match using %s method:\n" % method 

112 + "Criteria:\n%s\n" % str(criteria) 

113 + "Stanza:\n%s" % str(stanza), 

114 ) 

115 else: 

116 stanza_class = stanza.__class__ 

117 # Hack to preserve namespaces instead of having jabber:client 

118 # everywhere. 

119 old_ns = stanza_class.namespace 

120 stanza_class.namespace = stanza.namespace 

121 if not isinstance(criteria, ElementBase): 

122 xml = self.parse_xml(criteria) 

123 else: 

124 xml = criteria.xml 

125 

126 # Ensure that top level namespaces are used, even if they 

127 # were not provided. 

128 self.fix_namespaces(stanza.xml) 

129 self.fix_namespaces(xml) 

130 

131 stanza2 = stanza_class(xml=xml) 

132 

133 if use_values: 

134 # Using stanza.values will add XML for any interface that 

135 # has a default value. We need to set those defaults on 

136 # the existing stanzas and XML so that they will compare 

137 # correctly. 

138 default_stanza = stanza_class() 

139 if defaults is None: 

140 known_defaults = {Message: ["type"], Presence: ["priority"]} 

141 defaults = known_defaults.get(stanza_class, []) 

142 for interface in defaults: 

143 stanza[interface] = stanza[interface] 

144 stanza2[interface] = stanza2[interface] 

145 # Can really only automatically add defaults for top 

146 # level attribute values. Anything else must be accounted 

147 # for in the provided XML string. 

148 if interface not in xml.attrib: 

149 if interface in default_stanza.xml.attrib: 

150 value = default_stanza.xml.attrib[interface] 

151 xml.attrib[interface] = value 

152 

153 values = stanza2.values 

154 stanza3 = stanza_class() 

155 stanza3.values = values 

156 

157 debug = "Three methods for creating stanzas do not match.\n" 

158 debug += "Given XML:\n%s\n" % highlight(tostring(xml)) 

159 debug += "Given stanza:\n%s\n" % format_stanza(stanza) 

160 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml)) 

161 debug += "Second generated stanza:\n%s\n" % highlight( 

162 tostring(stanza3.xml) 

163 ) 

164 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) 

165 else: 

166 debug = "Two methods for creating stanzas do not match.\n" 

167 debug += "Given XML:\n%s\n" % highlight(tostring(xml)) 

168 debug += "Given stanza:\n%s\n" % format_stanza(stanza) 

169 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml)) 

170 result = self.compare(xml, stanza.xml, stanza2.xml) 

171 stanza_class.namespace = old_ns 

172 

173 if XML_DIFF_PRESENT and not result: 

174 debug += str( 

175 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml)) 

176 ) 

177 if use_values: 

178 debug += str( 

179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml)) 

180 ) 

181 self.assertTrue(result, debug) 

182 

183 def next_sent( 

184 self, timeout: float = 0.05 

185 ) -> Optional[Union[Message, Iq, Presence]]: 

186 self.wait_for_send_queue() 

187 sent = self.xmpp.socket.next_sent(timeout=timeout) 

188 if sent is None: 

189 return None 

190 xml = self.parse_xml(sent) 

191 self.fix_namespaces(xml, "jabber:component:accept") 

192 sent = self.xmpp._build_stanza(xml, "jabber:component:accept") 

193 return sent 

194 

195 

196class SlidgeTest(SlixTestPlus): 

197 plugin: Union[types.ModuleType, dict] 

198 

199 class Config: 

200 jid = "aim.shakespeare.lit" 

201 secret = "test" 

202 server = "shakespeare.lit" 

203 port = 5222 

204 upload_service = "upload.test" 

205 home_dir = Path(tempfile.mkdtemp()) 

206 user_jid_validator = ".*" 

207 admins: list[str] = [] 

208 upload_requester = None 

209 ignore_delay_threshold = _TimedeltaSeconds("300") 

210 

211 @classmethod 

212 def setUpClass(cls) -> None: 

213 for k, v in vars(cls.Config).items(): 

214 setattr(config, k.upper(), v) 

215 

216 def setUp(self): 

217 if hasattr(self, "plugin"): 

218 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway) 

219 BaseSession._subclass = find_subclass(self.plugin, BaseSession) 

220 LegacyRoster._subclass = find_subclass( 

221 self.plugin, LegacyRoster, base_ok=True 

222 ) 

223 LegacyContact._subclass = find_subclass( 

224 self.plugin, LegacyContact, base_ok=True 

225 ) 

226 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True) 

227 LegacyBookmarks._subclass = find_subclass( 

228 self.plugin, LegacyBookmarks, base_ok=True 

229 ) 

230 

231 # workaround for duplicate output of sql alchemy's log, cf 

232 # https://stackoverflow.com/a/76498428/5902284 

233 from sqlalchemy import log as sqlalchemy_log 

234 

235 sqlalchemy_log._add_default_handler = lambda x: None 

236 

237 engine = self.db_engine = create_engine("sqlite+pysqlite:///:memory:") 

238 Base.metadata.create_all(engine) 

239 BaseGateway.store = SlidgeStore(engine) 

240 BaseGateway._test_mode = True 

241 try: 

242 self.xmpp = BaseGateway.get_self_or_unique_subclass()() 

243 except Exception: 

244 raise 

245 self.xmpp.TEST_MODE = True 

246 avatar_cache.store = self.xmpp.store.avatars 

247 avatar_cache.set_dir(Path(tempfile.mkdtemp())) 

248 self.xmpp._always_send_everything = True 

249 engine.echo = True 

250 

251 self.xmpp.connection_made(TestTransport(self.xmpp)) 

252 self.xmpp.session_bind_event.set() 

253 # Remove unique ID prefix to make it easier to test 

254 self.xmpp._id_prefix = "" 

255 self.xmpp.default_lang = None 

256 self.xmpp.peer_default_lang = None 

257 

258 def new_id(): 

259 self.xmpp._id += 1 

260 return str(self.xmpp._id) 

261 

262 self.xmpp._id = 0 

263 self.xmpp.new_id = new_id 

264 

265 # Must have the stream header ready for xmpp.process() to work. 

266 header = self.xmpp.stream_header 

267 

268 self.xmpp.data_received(header) 

269 self.wait_for_send_queue() 

270 

271 self.xmpp.socket.next_sent() 

272 self.xmpp.socket.next_sent() 

273 

274 # Some plugins require messages to have ID values. Set 

275 # this to True in tests related to those plugins. 

276 self.xmpp.use_message_ids = False 

277 self.xmpp.use_presence_ids = False 

278 Error.namespace = "jabber:component:accept" 

279 

280 def tearDown(self) -> None: 

281 self.db_engine.echo = False 

282 super().tearDown() 

283 Base.metadata.drop_all(self.xmpp.store._engine) 

284 

285 def setup_logged_session(self, n_contacts: int = 0) -> None: 

286 with self.xmpp.store.session() as orm: 

287 user = GatewayUser( 

288 jid=JID("romeo@montague.lit/gajim").bare, 

289 legacy_module_data={"username": "romeo", "city": ""}, 

290 preferences={"sync_avatar": True, "sync_presence": True}, 

291 ) 

292 orm.add(user) 

293 orm.commit() 

294 

295 with self.xmpp.store.session() as session: 

296 session.execute(delete(Contact)) 

297 session.commit() 

298 

299 self.run_coro( 

300 self.xmpp._BaseGateway__dispatcher._on_user_register( 

301 Iq(sfrom="romeo@montague.lit/gajim") 

302 ) 

303 ) 

304 welcome = self.next_sent() 

305 assert welcome["body"], welcome 

306 stanza = self.next_sent() 

307 assert "logging in" in stanza["status"].lower(), stanza 

308 stanza = self.next_sent() 

309 assert "syncing contacts" in stanza["status"].lower(), stanza 

310 if BaseGateway.get_self_or_unique_subclass().GROUPS: 

311 stanza = self.next_sent() 

312 assert "syncing groups" in stanza["status"].lower(), stanza 

313 probe = self.next_sent() 

314 assert probe.get_type() == "probe" 

315 stanza = self.next_sent() 

316 assert "yup" in stanza["status"].lower(), stanza 

317 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid( 

318 JID("romeo@montague.lit") 

319 ) 

320 

321 self.juliet: LegacyContact = self.run_coro( 

322 self.romeo.contacts.by_legacy_id("juliet") 

323 ) 

324 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room")) 

325 self.first_witch: LegacyParticipant = self.run_coro( 

326 self.room.get_participant("firstwitch") 

327 ) 

328 self.send( # language=XML 

329 """ 

330 <iq type="get" 

331 to="romeo@montague.lit" 

332 id="1" 

333 from="aim.shakespeare.lit"> 

334 <pubsub xmlns="http://jabber.org/protocol/pubsub"> 

335 <items node="urn:xmpp:avatar:metadata" /> 

336 </pubsub> 

337 </iq> 

338 """ 

339 ) 

340 

341 @classmethod 

342 def tearDownClass(cls) -> None: 

343 reset_subclasses() 

344 

345 

346def format_stanza(stanza): 

347 return highlight( 

348 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:]) 

349 ) 

350 

351 

352def find_subclass(o, parent, base_ok: bool = False): 

353 try: 

354 vals = vars(o).values() 

355 except TypeError: 

356 vals = o.values() 

357 for x in vals: 

358 try: 

359 if issubclass(x, parent) and x is not parent: 

360 return x 

361 except TypeError: 

362 pass 

363 else: 

364 if base_ok: 

365 return parent 

366 else: 

367 raise RuntimeError 

368 

369 

370def reset_subclasses() -> None: 

371 """ 

372 Reset registered subclasses between test classes. 

373 

374 Needed because these classes are meant to only be subclassed once and raise 

375 exceptions otherwise. 

376 """ 

377 BaseSession.reset_subclass() 

378 BaseGateway.reset_subclass() 

379 LegacyRoster.reset_subclass() 

380 LegacyContact.reset_subclass() 

381 LegacyMUC.reset_subclass() 

382 LegacyBookmarks.reset_subclass() 

383 LegacyParticipant.reset_subclass() 

384 # reset_commands() 

385 

386 

387def reset_commands() -> None: 

388 Command.subclasses = [ 

389 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core") 

390 ]