Coverage for slidge / util / test.py: 91%

233 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1# type:ignore 

2import os 

3import tempfile 

4import types 

5from pathlib import Path 

6from typing import ClassVar 

7from xml.dom.minidom import parseString 

8 

9try: 

10 import xmldiff.main 

11 

12 XML_DIFF_PRESENT = True 

13except ImportError: 

14 xmldiff = None 

15 XML_DIFF_PRESENT = False 

16 

17from slixmpp import ( 

18 JID, 

19 ElementBase, 

20 Iq, 

21 MatcherId, 

22 MatchXMLMask, 

23 MatchXPath, 

24 Message, 

25 Presence, 

26 StanzaPath, 

27) 

28from slixmpp.stanza.error import Error 

29from slixmpp.test import SlixTest, TestTransport 

30from slixmpp.xmlstream import highlight, tostring 

31from slixmpp.xmlstream.matcher import MatchIDSender 

32from sqlalchemy import create_engine, delete 

33 

34from slidge import ( 

35 BaseGateway, 

36 BaseSession, 

37 LegacyBookmarks, 

38 LegacyContact, 

39 LegacyMUC, 

40 LegacyParticipant, 

41 LegacyRoster, 

42) 

43 

44from ..command import Command 

45from ..core import config 

46from ..core.session import _sessions 

47from ..db import SlidgeStore 

48from ..db.avatar import avatar_cache 

49from ..db.meta import Base 

50from ..db.models import Contact, GatewayUser 

51from ..util import SubclassableOnce 

52 

53 

54class SlixTestPlus(SlixTest): 

55 def setUp(self) -> None: 

56 super().setUp() 

57 Error.namespace = "jabber:component:accept" 

58 

59 def check( 

60 self, 

61 stanza, # noqa 

62 criteria, # noqa 

63 method: str = "exact", 

64 defaults=None, # noqa 

65 use_values: bool = True, 

66 ) -> None: 

67 """ 

68 Create and compare several stanza objects to a correct XML string. 

69 

70 If use_values is False, tests using stanza.values will not be used. 

71 

72 Some stanzas provide default values for some interfaces, but 

73 these defaults can be problematic for testing since they can easily 

74 be forgotten when supplying the XML string. A list of interfaces that 

75 use defaults may be provided and the generated stanzas will use the 

76 default values for those interfaces if needed. 

77 

78 However, correcting the supplied XML is not possible for interfaces 

79 that add or remove XML elements. Only interfaces that map to XML 

80 attributes may be set using the defaults parameter. The supplied XML 

81 must take into account any extra elements that are included by default. 

82 

83 Arguments: 

84 stanza -- The stanza object to test. 

85 criteria -- An expression the stanza must match against. 

86 method -- The type of matching to use; one of: 

87 'exact', 'mask', 'id', 'xpath', and 'stanzapath'. 

88 Defaults to the value of self.match_method. 

89 defaults -- A list of stanza interfaces that have default 

90 values. These interfaces will be set to their 

91 defaults for the given and generated stanzas to 

92 prevent unexpected test failures. 

93 use_values -- Indicates if testing using stanza.values should 

94 be used. Defaults to True. 

95 """ 

96 if method is None and hasattr(self, "match_method"): 

97 method = getattr(self, "match_method") 

98 

99 if method != "exact": 

100 matchers = { 

101 "stanzapath": StanzaPath, 

102 "xpath": MatchXPath, 

103 "mask": MatchXMLMask, 

104 "idsender": MatchIDSender, 

105 "id": MatcherId, 

106 } 

107 Matcher = matchers.get(method) 

108 if Matcher is None: 

109 raise ValueError("Unknown matching method.") 

110 test = Matcher(criteria) 

111 self.assertTrue( 

112 test.match(stanza), 

113 f"Stanza did not match using {method} method:\n" 

114 + f"Criteria:\n{criteria!s}\n" 

115 + f"Stanza:\n{stanza!s}", 

116 ) 

117 else: 

118 stanza_class = stanza.__class__ 

119 # Hack to preserve namespaces instead of having jabber:client 

120 # everywhere. 

121 old_ns = stanza_class.namespace 

122 stanza_class.namespace = stanza.namespace 

123 if not isinstance(criteria, ElementBase): 

124 xml = self.parse_xml(criteria) 

125 else: 

126 xml = criteria.xml 

127 

128 # Ensure that top level namespaces are used, even if they 

129 # were not provided. 

130 self.fix_namespaces(stanza.xml) 

131 self.fix_namespaces(xml) 

132 

133 stanza2 = stanza_class(xml=xml) 

134 

135 if use_values: 

136 # Using stanza.values will add XML for any interface that 

137 # has a default value. We need to set those defaults on 

138 # the existing stanzas and XML so that they will compare 

139 # correctly. 

140 default_stanza = stanza_class() 

141 if defaults is None: 

142 known_defaults = {Message: ["type"], Presence: ["priority"]} 

143 defaults = known_defaults.get(stanza_class, []) 

144 for interface in defaults: 

145 stanza[interface] = stanza[interface] 

146 stanza2[interface] = stanza2[interface] 

147 # Can really only automatically add defaults for top 

148 # level attribute values. Anything else must be accounted 

149 # for in the provided XML string. 

150 if ( 

151 interface not in xml.attrib 

152 and interface in default_stanza.xml.attrib 

153 ): 

154 value = default_stanza.xml.attrib[interface] 

155 xml.attrib[interface] = value 

156 

157 values = stanza2.values 

158 stanza3 = stanza_class() 

159 stanza3.values = values 

160 

161 debug = "Three methods for creating stanzas do not match.\n" 

162 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

163 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

164 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

165 debug += ( 

166 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n" 

167 ) 

168 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) 

169 else: 

170 debug = "Two methods for creating stanzas do not match.\n" 

171 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

172 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

173 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

174 result = self.compare(xml, stanza.xml, stanza2.xml) 

175 stanza_class.namespace = old_ns 

176 

177 if XML_DIFF_PRESENT and not result: 

178 debug += str( 

179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml)) 

180 ) 

181 if use_values: 

182 debug += str( 

183 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml)) 

184 ) 

185 self.assertTrue(result, debug) 

186 

187 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None: 

188 self.wait_for_send_queue() 

189 sent = self.xmpp.socket.next_sent(timeout=timeout) 

190 if sent is None: 

191 return None 

192 xml = self.parse_xml(sent) 

193 self.fix_namespaces(xml, "jabber:component:accept") 

194 sent = self.xmpp._build_stanza(xml, "jabber:component:accept") 

195 return sent 

196 

197 

198class SlidgeTest(SlixTestPlus): 

199 plugin: types.ModuleType | dict 

200 

201 class Config: 

202 jid = "aim.shakespeare.lit" 

203 secret = "test" 

204 server = "shakespeare.lit" 

205 port = 5222 

206 upload_service = "upload.test" 

207 home_dir = Path(tempfile.mkdtemp()) 

208 user_jid_validator = ".*" 

209 admins: ClassVar[list[str]] = [] 

210 upload_requester = None 

211 ignore_delay_threshold = 300 

212 

213 @classmethod 

214 def setUpClass(cls) -> None: 

215 for k, v in vars(cls.Config).items(): 

216 setattr(config, k.upper(), v) 

217 if hasattr(cls, "plugin"): 

218 subclasses = SubclassableOnce._SubclassableOnce__subclasses 

219 subclasses.update( 

220 { 

221 BaseGateway: find_subclass(cls.plugin, BaseGateway), 

222 BaseSession: find_subclass(cls.plugin, BaseSession), 

223 LegacyRoster: find_subclass(cls.plugin, LegacyRoster, base_ok=True), 

224 LegacyContact: find_subclass( 

225 cls.plugin, LegacyContact, base_ok=True 

226 ), 

227 LegacyMUC: find_subclass(cls.plugin, LegacyMUC, base_ok=True), 

228 LegacyBookmarks: find_subclass( 

229 cls.plugin, LegacyBookmarks, base_ok=True 

230 ), 

231 } 

232 ) 

233 for subclass in subclasses.values(): 

234 if subclass is not None: 

235 subclass.__abstractmethods__ = set() 

236 

237 def setUp(self) -> None: 

238 # workaround for duplicate output of sql alchemy's log, cf 

239 # https://stackoverflow.com/a/76498428/5902284 

240 from sqlalchemy import log as sqlalchemy_log 

241 from sqlalchemy.pool import StaticPool 

242 

243 sqlalchemy_log._add_default_handler = lambda x: None 

244 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:") 

245 engine = self.db_engine = create_engine( 

246 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None 

247 ) 

248 Base.metadata.create_all(engine) 

249 BaseGateway.store = SlidgeStore(engine) 

250 BaseGateway._test_mode = True 

251 

252 self.xmpp = BaseGateway.get_self_or_unique_subclass()() 

253 

254 self.xmpp.TEST_MODE = True 

255 avatar_cache.store = self.xmpp.store.avatars 

256 avatar_cache.set_dir(Path(tempfile.mkdtemp())) 

257 self.xmpp._always_send_everything = True 

258 engine.echo = True 

259 

260 self.xmpp.connection_made(TestTransport(self.xmpp)) 

261 self.xmpp.session_bind_event.set() 

262 # Remove unique ID prefix to make it easier to test 

263 self.xmpp._id_prefix = "" 

264 self.xmpp.default_lang = None 

265 self.xmpp.peer_default_lang = None 

266 

267 def new_id() -> str: 

268 self.xmpp._id += 1 

269 return str(self.xmpp._id) 

270 

271 self.xmpp._id = 0 

272 self.xmpp.new_id = new_id 

273 

274 # Must have the stream header ready for xmpp.process() to work. 

275 header = self.xmpp.stream_header 

276 

277 self.xmpp.data_received(header) 

278 self.wait_for_send_queue() 

279 

280 self.xmpp.socket.next_sent() 

281 self.xmpp.socket.next_sent() 

282 

283 # Some plugins require messages to have ID values. Set 

284 # this to True in tests related to those plugins. 

285 self.xmpp.use_message_ids = False 

286 self.xmpp.use_presence_ids = False 

287 Error.namespace = "jabber:component:accept" 

288 

289 def _add_slidge_user( 

290 self, 

291 jid: str | JID = "romeo@shakespeare.lit", 

292 legacy_module_data: dict | None = None, 

293 preferences: dict | None = None, 

294 ) -> None: 

295 with self.xmpp.store.session() as orm: 

296 user = GatewayUser( 

297 jid=JID(jid), 

298 legacy_module_data=legacy_module_data or {}, 

299 preferences=preferences 

300 or {"sync_avatar": False, "sync_presence": False}, 

301 ) 

302 orm.add(user) 

303 orm.commit() 

304 self.run_coro( 

305 self.xmpp._BaseGateway__dispatcher._on_user_register(Iq(sfrom=JID(jid))) 

306 ) 

307 welcome = self.next_sent() 

308 assert welcome["body"] 

309 stanza = self.next_sent() 

310 assert "logging in" in stanza["status"].lower(), stanza 

311 stanza = self.next_sent() 

312 assert "syncing contacts" in stanza["status"].lower(), stanza 

313 if self.xmpp.GROUPS: 

314 stanza = self.next_sent() 

315 assert "syncing groups" in stanza["status"].lower(), stanza 

316 probe = self.next_sent() 

317 assert probe.get_type() == "probe" 

318 stanza = self.next_sent() 

319 assert stanza["status"].lower() 

320 

321 def user_session(self, jid: str | JID = "romeo@shakespeare.lit") -> BaseSession: 

322 return BaseSession.get_self_or_unique_subclass().from_jid(JID(jid)) 

323 

324 def get_joined_muc( 

325 self, 

326 legacy_id: str | int, 

327 user_jid: str | JID = "romeo@shakespeare.lit", 

328 resource: str = "gajim", 

329 ) -> LegacyMUC: 

330 muc: LegacyMUC = self.run_coro( 

331 self.user_session(user_jid).bookmarks.by_legacy_id(legacy_id) 

332 ) 

333 muc.add_user_resource(resource) 

334 return muc 

335 

336 def tearDown(self) -> None: 

337 self.db_engine.echo = False 

338 super().tearDown() 

339 Base.metadata.drop_all(self.xmpp.store._engine) 

340 self.db_engine.dispose() 

341 _sessions.clear() 

342 

343 def setup_logged_session(self, n_contacts: int = 0) -> None: 

344 with self.xmpp.store.session() as orm: 

345 user = GatewayUser( 

346 jid=JID("romeo@montague.lit/gajim").bare, 

347 legacy_module_data={"username": "romeo", "city": ""}, 

348 preferences={"sync_avatar": True, "sync_presence": True}, 

349 ) 

350 orm.add(user) 

351 orm.commit() 

352 

353 with self.xmpp.store.session() as session: 

354 session.execute(delete(Contact)) 

355 session.commit() 

356 

357 self.run_coro( 

358 self.xmpp._BaseGateway__dispatcher._on_user_register( 

359 Iq(sfrom="romeo@montague.lit/gajim") 

360 ) 

361 ) 

362 welcome = self.next_sent() 

363 assert welcome["body"], welcome 

364 stanza = self.next_sent() 

365 assert "logging in" in stanza["status"].lower(), stanza 

366 stanza = self.next_sent() 

367 assert "syncing contacts" in stanza["status"].lower(), stanza 

368 if BaseGateway.get_self_or_unique_subclass().GROUPS: 

369 stanza = self.next_sent() 

370 assert "syncing groups" in stanza["status"].lower(), stanza 

371 probe = self.next_sent() 

372 assert probe.get_type() == "probe" 

373 stanza = self.next_sent() 

374 assert "yup" in stanza["status"].lower(), stanza 

375 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid( 

376 JID("romeo@montague.lit") 

377 ) 

378 

379 self.juliet: LegacyContact = self.run_coro( 

380 self.romeo.contacts.by_legacy_id("juliet") 

381 ) 

382 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room")) 

383 self.first_witch: LegacyParticipant = self.run_coro( 

384 self.room.get_participant("firstwitch") 

385 ) 

386 self.send( # language=XML 

387 """ 

388 <iq type="get" 

389 to="romeo@montague.lit" 

390 id="1" 

391 from="aim.shakespeare.lit"> 

392 <pubsub xmlns="http://jabber.org/protocol/pubsub"> 

393 <items node="urn:xmpp:avatar:metadata" /> 

394 </pubsub> 

395 </iq> 

396 """ 

397 ) 

398 

399 @classmethod 

400 def tearDownClass(cls) -> None: 

401 reset_subclasses() 

402 

403 

404def format_stanza(stanza: ElementBase) -> str: 

405 return highlight( 

406 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:]) 

407 ) 

408 

409 

410def find_subclass(o, parent, base_ok: bool = False): # noqa 

411 try: 

412 vals = vars(o).values() 

413 except TypeError: 

414 vals = o.values() 

415 for x in vals: 

416 try: 

417 if issubclass(x, parent) and x is not parent: 

418 return x 

419 except TypeError: 

420 pass 

421 else: 

422 if base_ok: 

423 return parent 

424 else: 

425 raise RuntimeError 

426 

427 

428def reset_subclasses() -> None: 

429 """ 

430 Reset registered subclasses between test classes. 

431 

432 Needed because these classes are meant to only be subclassed once and raise 

433 exceptions otherwise. 

434 """ 

435 BaseSession.reset_subclass() 

436 BaseGateway.reset_subclass() 

437 LegacyRoster.reset_subclass() 

438 LegacyContact.reset_subclass() 

439 LegacyMUC.reset_subclass() 

440 LegacyBookmarks.reset_subclass() 

441 LegacyParticipant.reset_subclass() 

442 # reset_commands() 

443 

444 

445def reset_commands() -> None: 

446 Command.subclasses = [ 

447 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core") 

448 ]