Coverage for slidge / util / test.py: 91%

233 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-13 22:59 +0000

1# type:ignore 

2import os 

3import tempfile 

4import types 

5from pathlib import Path 

6from xml.dom.minidom import parseString 

7 

8try: 

9 import xmldiff.main 

10 

11 XML_DIFF_PRESENT = True 

12except ImportError: 

13 xmldiff = None 

14 XML_DIFF_PRESENT = False 

15 

16from slixmpp import ( 

17 JID, 

18 ElementBase, 

19 Iq, 

20 MatcherId, 

21 MatchXMLMask, 

22 MatchXPath, 

23 Message, 

24 Presence, 

25 StanzaPath, 

26) 

27from slixmpp.stanza.error import Error 

28from slixmpp.test import SlixTest, TestTransport 

29from slixmpp.xmlstream import highlight, tostring 

30from slixmpp.xmlstream.matcher import MatchIDSender 

31from sqlalchemy import create_engine, delete 

32 

33from slidge import ( 

34 BaseGateway, 

35 BaseSession, 

36 LegacyBookmarks, 

37 LegacyContact, 

38 LegacyMUC, 

39 LegacyParticipant, 

40 LegacyRoster, 

41) 

42 

43from ..command import Command 

44from ..core import config 

45from ..core.session import _sessions 

46from ..db import SlidgeStore 

47from ..db.avatar import avatar_cache 

48from ..db.meta import Base 

49from ..db.models import Contact, GatewayUser 

50from ..util import SubclassableOnce 

51 

52 

53class SlixTestPlus(SlixTest): 

54 def setUp(self) -> None: 

55 super().setUp() 

56 Error.namespace = "jabber:component:accept" 

57 

58 def check( 

59 self, 

60 stanza, 

61 criteria, 

62 method: str = "exact", 

63 defaults=None, 

64 use_values: bool = True, 

65 ) -> None: 

66 """ 

67 Create and compare several stanza objects to a correct XML string. 

68 

69 If use_values is False, tests using stanza.values will not be used. 

70 

71 Some stanzas provide default values for some interfaces, but 

72 these defaults can be problematic for testing since they can easily 

73 be forgotten when supplying the XML string. A list of interfaces that 

74 use defaults may be provided and the generated stanzas will use the 

75 default values for those interfaces if needed. 

76 

77 However, correcting the supplied XML is not possible for interfaces 

78 that add or remove XML elements. Only interfaces that map to XML 

79 attributes may be set using the defaults parameter. The supplied XML 

80 must take into account any extra elements that are included by default. 

81 

82 Arguments: 

83 stanza -- The stanza object to test. 

84 criteria -- An expression the stanza must match against. 

85 method -- The type of matching to use; one of: 

86 'exact', 'mask', 'id', 'xpath', and 'stanzapath'. 

87 Defaults to the value of self.match_method. 

88 defaults -- A list of stanza interfaces that have default 

89 values. These interfaces will be set to their 

90 defaults for the given and generated stanzas to 

91 prevent unexpected test failures. 

92 use_values -- Indicates if testing using stanza.values should 

93 be used. Defaults to True. 

94 """ 

95 if method is None and hasattr(self, "match_method"): 

96 method = getattr(self, "match_method") 

97 

98 if method != "exact": 

99 matchers = { 

100 "stanzapath": StanzaPath, 

101 "xpath": MatchXPath, 

102 "mask": MatchXMLMask, 

103 "idsender": MatchIDSender, 

104 "id": MatcherId, 

105 } 

106 Matcher = matchers.get(method, None) 

107 if Matcher is None: 

108 raise ValueError("Unknown matching method.") 

109 test = Matcher(criteria) 

110 self.assertTrue( 

111 test.match(stanza), 

112 f"Stanza did not match using {method} method:\n" 

113 + f"Criteria:\n{str(criteria)}\n" 

114 + f"Stanza:\n{str(stanza)}", 

115 ) 

116 else: 

117 stanza_class = stanza.__class__ 

118 # Hack to preserve namespaces instead of having jabber:client 

119 # everywhere. 

120 old_ns = stanza_class.namespace 

121 stanza_class.namespace = stanza.namespace 

122 if not isinstance(criteria, ElementBase): 

123 xml = self.parse_xml(criteria) 

124 else: 

125 xml = criteria.xml 

126 

127 # Ensure that top level namespaces are used, even if they 

128 # were not provided. 

129 self.fix_namespaces(stanza.xml) 

130 self.fix_namespaces(xml) 

131 

132 stanza2 = stanza_class(xml=xml) 

133 

134 if use_values: 

135 # Using stanza.values will add XML for any interface that 

136 # has a default value. We need to set those defaults on 

137 # the existing stanzas and XML so that they will compare 

138 # correctly. 

139 default_stanza = stanza_class() 

140 if defaults is None: 

141 known_defaults = {Message: ["type"], Presence: ["priority"]} 

142 defaults = known_defaults.get(stanza_class, []) 

143 for interface in defaults: 

144 stanza[interface] = stanza[interface] 

145 stanza2[interface] = stanza2[interface] 

146 # Can really only automatically add defaults for top 

147 # level attribute values. Anything else must be accounted 

148 # for in the provided XML string. 

149 if interface not in xml.attrib: 

150 if interface in default_stanza.xml.attrib: 

151 value = default_stanza.xml.attrib[interface] 

152 xml.attrib[interface] = value 

153 

154 values = stanza2.values 

155 stanza3 = stanza_class() 

156 stanza3.values = values 

157 

158 debug = "Three methods for creating stanzas do not match.\n" 

159 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

160 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

161 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

162 debug += ( 

163 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n" 

164 ) 

165 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) 

166 else: 

167 debug = "Two methods for creating stanzas do not match.\n" 

168 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

169 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

170 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

171 result = self.compare(xml, stanza.xml, stanza2.xml) 

172 stanza_class.namespace = old_ns 

173 

174 if XML_DIFF_PRESENT and not result: 

175 debug += str( 

176 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml)) 

177 ) 

178 if use_values: 

179 debug += str( 

180 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml)) 

181 ) 

182 self.assertTrue(result, debug) 

183 

184 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None: 

185 self.wait_for_send_queue() 

186 sent = self.xmpp.socket.next_sent(timeout=timeout) 

187 if sent is None: 

188 return None 

189 xml = self.parse_xml(sent) 

190 self.fix_namespaces(xml, "jabber:component:accept") 

191 sent = self.xmpp._build_stanza(xml, "jabber:component:accept") 

192 return sent 

193 

194 

195class SlidgeTest(SlixTestPlus): 

196 plugin: types.ModuleType | dict 

197 

198 class Config: 

199 jid = "aim.shakespeare.lit" 

200 secret = "test" 

201 server = "shakespeare.lit" 

202 port = 5222 

203 upload_service = "upload.test" 

204 home_dir = Path(tempfile.mkdtemp()) 

205 user_jid_validator = ".*" 

206 admins: list[str] = [] 

207 upload_requester = None 

208 ignore_delay_threshold = 300 

209 

210 @classmethod 

211 def setUpClass(cls) -> None: 

212 for k, v in vars(cls.Config).items(): 

213 setattr(config, k.upper(), v) 

214 if hasattr(cls, "plugin"): 

215 subclasses = SubclassableOnce._SubclassableOnce__subclasses 

216 subclasses.update( 

217 { 

218 BaseGateway: find_subclass(cls.plugin, BaseGateway), 

219 BaseSession: find_subclass(cls.plugin, BaseSession), 

220 LegacyRoster: find_subclass(cls.plugin, LegacyRoster, base_ok=True), 

221 LegacyContact: find_subclass( 

222 cls.plugin, LegacyContact, base_ok=True 

223 ), 

224 LegacyMUC: find_subclass(cls.plugin, LegacyMUC, base_ok=True), 

225 LegacyBookmarks: find_subclass( 

226 cls.plugin, LegacyBookmarks, base_ok=True 

227 ), 

228 } 

229 ) 

230 for subclass in subclasses.values(): 

231 if subclass is not None: 

232 subclass.__abstractmethods__ = set() 

233 

234 def setUp(self) -> None: 

235 # workaround for duplicate output of sql alchemy's log, cf 

236 # https://stackoverflow.com/a/76498428/5902284 

237 from sqlalchemy import log as sqlalchemy_log 

238 from sqlalchemy.pool import StaticPool 

239 

240 sqlalchemy_log._add_default_handler = lambda x: None 

241 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:") 

242 engine = self.db_engine = create_engine( 

243 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None 

244 ) 

245 Base.metadata.create_all(engine) 

246 BaseGateway.store = SlidgeStore(engine) 

247 BaseGateway._test_mode = True 

248 

249 self.xmpp = BaseGateway.get_self_or_unique_subclass()() 

250 

251 self.xmpp.TEST_MODE = True 

252 avatar_cache.store = self.xmpp.store.avatars 

253 avatar_cache.set_dir(Path(tempfile.mkdtemp())) 

254 self.xmpp._always_send_everything = True 

255 engine.echo = True 

256 

257 self.xmpp.connection_made(TestTransport(self.xmpp)) 

258 self.xmpp.session_bind_event.set() 

259 # Remove unique ID prefix to make it easier to test 

260 self.xmpp._id_prefix = "" 

261 self.xmpp.default_lang = None 

262 self.xmpp.peer_default_lang = None 

263 

264 def new_id(): 

265 self.xmpp._id += 1 

266 return str(self.xmpp._id) 

267 

268 self.xmpp._id = 0 

269 self.xmpp.new_id = new_id 

270 

271 # Must have the stream header ready for xmpp.process() to work. 

272 header = self.xmpp.stream_header 

273 

274 self.xmpp.data_received(header) 

275 self.wait_for_send_queue() 

276 

277 self.xmpp.socket.next_sent() 

278 self.xmpp.socket.next_sent() 

279 

280 # Some plugins require messages to have ID values. Set 

281 # this to True in tests related to those plugins. 

282 self.xmpp.use_message_ids = False 

283 self.xmpp.use_presence_ids = False 

284 Error.namespace = "jabber:component:accept" 

285 

286 def _add_slidge_user( 

287 self, 

288 jid: str | JID = "romeo@shakespeare.lit", 

289 legacy_module_data=None, 

290 preferences=None, 

291 ) -> None: 

292 with self.xmpp.store.session() as orm: 

293 user = GatewayUser( 

294 jid=JID(jid), 

295 legacy_module_data=legacy_module_data or {}, 

296 preferences=preferences 

297 or {"sync_avatar": False, "sync_presence": False}, 

298 ) 

299 orm.add(user) 

300 orm.commit() 

301 self.run_coro( 

302 self.xmpp._BaseGateway__dispatcher._on_user_register(Iq(sfrom=JID(jid))) 

303 ) 

304 welcome = self.next_sent() 

305 assert welcome["body"] 

306 stanza = self.next_sent() 

307 assert "logging in" in stanza["status"].lower(), stanza 

308 stanza = self.next_sent() 

309 assert "syncing contacts" in stanza["status"].lower(), stanza 

310 if self.xmpp.GROUPS: 

311 stanza = self.next_sent() 

312 assert "syncing groups" in stanza["status"].lower(), stanza 

313 probe = self.next_sent() 

314 assert probe.get_type() == "probe" 

315 stanza = self.next_sent() 

316 assert stanza["status"].lower() 

317 

318 def user_session(self, jid: str | JID = "romeo@shakespeare.lit") -> BaseSession: 

319 return BaseSession.get_self_or_unique_subclass().from_jid(JID(jid)) 

320 

321 def get_joined_muc( 

322 self, 

323 legacy_id: str | int, 

324 user_jid: str | JID = "romeo@shakespeare.lit", 

325 resource: str = "gajim", 

326 ) -> LegacyMUC: 

327 muc: LegacyMUC = self.run_coro( 

328 self.user_session(user_jid).bookmarks.by_legacy_id(legacy_id) 

329 ) 

330 muc.add_user_resource(resource) 

331 return muc 

332 

333 def tearDown(self) -> None: 

334 self.db_engine.echo = False 

335 super().tearDown() 

336 Base.metadata.drop_all(self.xmpp.store._engine) 

337 self.db_engine.dispose() 

338 _sessions.clear() 

339 

340 def setup_logged_session(self, n_contacts: int = 0) -> None: 

341 with self.xmpp.store.session() as orm: 

342 user = GatewayUser( 

343 jid=JID("romeo@montague.lit/gajim").bare, 

344 legacy_module_data={"username": "romeo", "city": ""}, 

345 preferences={"sync_avatar": True, "sync_presence": True}, 

346 ) 

347 orm.add(user) 

348 orm.commit() 

349 

350 with self.xmpp.store.session() as session: 

351 session.execute(delete(Contact)) 

352 session.commit() 

353 

354 self.run_coro( 

355 self.xmpp._BaseGateway__dispatcher._on_user_register( 

356 Iq(sfrom="romeo@montague.lit/gajim") 

357 ) 

358 ) 

359 welcome = self.next_sent() 

360 assert welcome["body"], welcome 

361 stanza = self.next_sent() 

362 assert "logging in" in stanza["status"].lower(), stanza 

363 stanza = self.next_sent() 

364 assert "syncing contacts" in stanza["status"].lower(), stanza 

365 if BaseGateway.get_self_or_unique_subclass().GROUPS: 

366 stanza = self.next_sent() 

367 assert "syncing groups" in stanza["status"].lower(), stanza 

368 probe = self.next_sent() 

369 assert probe.get_type() == "probe" 

370 stanza = self.next_sent() 

371 assert "yup" in stanza["status"].lower(), stanza 

372 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid( 

373 JID("romeo@montague.lit") 

374 ) 

375 

376 self.juliet: LegacyContact = self.run_coro( 

377 self.romeo.contacts.by_legacy_id("juliet") 

378 ) 

379 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room")) 

380 self.first_witch: LegacyParticipant = self.run_coro( 

381 self.room.get_participant("firstwitch") 

382 ) 

383 self.send( # language=XML 

384 """ 

385 <iq type="get" 

386 to="romeo@montague.lit" 

387 id="1" 

388 from="aim.shakespeare.lit"> 

389 <pubsub xmlns="http://jabber.org/protocol/pubsub"> 

390 <items node="urn:xmpp:avatar:metadata" /> 

391 </pubsub> 

392 </iq> 

393 """ 

394 ) 

395 

396 @classmethod 

397 def tearDownClass(cls) -> None: 

398 reset_subclasses() 

399 

400 

401def format_stanza(stanza): 

402 return highlight( 

403 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:]) 

404 ) 

405 

406 

407def find_subclass(o, parent, base_ok: bool = False): 

408 try: 

409 vals = vars(o).values() 

410 except TypeError: 

411 vals = o.values() 

412 for x in vals: 

413 try: 

414 if issubclass(x, parent) and x is not parent: 

415 return x 

416 except TypeError: 

417 pass 

418 else: 

419 if base_ok: 

420 return parent 

421 else: 

422 raise RuntimeError 

423 

424 

425def reset_subclasses() -> None: 

426 """ 

427 Reset registered subclasses between test classes. 

428 

429 Needed because these classes are meant to only be subclassed once and raise 

430 exceptions otherwise. 

431 """ 

432 BaseSession.reset_subclass() 

433 BaseGateway.reset_subclass() 

434 LegacyRoster.reset_subclass() 

435 LegacyContact.reset_subclass() 

436 LegacyMUC.reset_subclass() 

437 LegacyBookmarks.reset_subclass() 

438 LegacyParticipant.reset_subclass() 

439 # reset_commands() 

440 

441 

442def reset_commands() -> None: 

443 Command.subclasses = [ 

444 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core") 

445 ]