Coverage for slidge / util / test.py: 91%

234 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1# type:ignore 

2import os 

3import tempfile 

4import types 

5from pathlib import Path 

6from typing import ClassVar 

7from xml.dom.minidom import parseString 

8 

9try: 

10 import xmldiff.main 

11 

12 XML_DIFF_PRESENT = True 

13except ImportError: 

14 xmldiff = None 

15 XML_DIFF_PRESENT = False 

16 

17from slixmpp import ( 

18 JID, 

19 ElementBase, 

20 Iq, 

21 MatcherId, 

22 MatchXMLMask, 

23 MatchXPath, 

24 Message, 

25 Presence, 

26 StanzaPath, 

27) 

28from slixmpp.stanza.error import Error 

29from slixmpp.test import SlixTest, TestTransport 

30from slixmpp.xmlstream import highlight, tostring 

31from slixmpp.xmlstream.matcher import MatchIDSender 

32from sqlalchemy import create_engine, delete 

33 

34from slidge import ( 

35 BaseGateway, 

36 BaseSession, 

37 LegacyBookmarks, 

38 LegacyContact, 

39 LegacyMUC, 

40 LegacyParticipant, 

41 LegacyRoster, 

42) 

43 

44from ..command import Command 

45from ..core import config 

46from ..core.session import _sessions 

47from ..db import SlidgeStore 

48from ..db.avatar import avatar_cache 

49from ..db.meta import Base 

50from ..db.models import Contact, GatewayUser 

51from ..util import SubclassableOnce 

52 

53 

54class SlixTestPlus(SlixTest): 

55 def setUp(self) -> None: 

56 super().setUp() 

57 Error.namespace = "jabber:component:accept" 

58 

59 def check( 

60 self, 

61 stanza, # noqa 

62 criteria, # noqa 

63 method: str = "exact", 

64 defaults=None, # noqa 

65 use_values: bool = True, 

66 ) -> None: 

67 """ 

68 Create and compare several stanza objects to a correct XML string. 

69 

70 If use_values is False, tests using stanza.values will not be used. 

71 

72 Some stanzas provide default values for some interfaces, but 

73 these defaults can be problematic for testing since they can easily 

74 be forgotten when supplying the XML string. A list of interfaces that 

75 use defaults may be provided and the generated stanzas will use the 

76 default values for those interfaces if needed. 

77 

78 However, correcting the supplied XML is not possible for interfaces 

79 that add or remove XML elements. Only interfaces that map to XML 

80 attributes may be set using the defaults parameter. The supplied XML 

81 must take into account any extra elements that are included by default. 

82 

83 Arguments: 

84 stanza -- The stanza object to test. 

85 criteria -- An expression the stanza must match against. 

86 method -- The type of matching to use; one of: 

87 'exact', 'mask', 'id', 'xpath', and 'stanzapath'. 

88 Defaults to the value of self.match_method. 

89 defaults -- A list of stanza interfaces that have default 

90 values. These interfaces will be set to their 

91 defaults for the given and generated stanzas to 

92 prevent unexpected test failures. 

93 use_values -- Indicates if testing using stanza.values should 

94 be used. Defaults to True. 

95 """ 

96 if method is None and hasattr(self, "match_method"): 

97 method = getattr(self, "match_method") 

98 

99 if method != "exact": 

100 matchers = { 

101 "stanzapath": StanzaPath, 

102 "xpath": MatchXPath, 

103 "mask": MatchXMLMask, 

104 "idsender": MatchIDSender, 

105 "id": MatcherId, 

106 } 

107 Matcher = matchers.get(method, None) 

108 if Matcher is None: 

109 raise ValueError("Unknown matching method.") 

110 test = Matcher(criteria) 

111 self.assertTrue( 

112 test.match(stanza), 

113 f"Stanza did not match using {method} method:\n" 

114 + f"Criteria:\n{criteria!s}\n" 

115 + f"Stanza:\n{stanza!s}", 

116 ) 

117 else: 

118 stanza_class = stanza.__class__ 

119 # Hack to preserve namespaces instead of having jabber:client 

120 # everywhere. 

121 old_ns = stanza_class.namespace 

122 stanza_class.namespace = stanza.namespace 

123 if not isinstance(criteria, ElementBase): 

124 xml = self.parse_xml(criteria) 

125 else: 

126 xml = criteria.xml 

127 

128 # Ensure that top level namespaces are used, even if they 

129 # were not provided. 

130 self.fix_namespaces(stanza.xml) 

131 self.fix_namespaces(xml) 

132 

133 stanza2 = stanza_class(xml=xml) 

134 

135 if use_values: 

136 # Using stanza.values will add XML for any interface that 

137 # has a default value. We need to set those defaults on 

138 # the existing stanzas and XML so that they will compare 

139 # correctly. 

140 default_stanza = stanza_class() 

141 if defaults is None: 

142 known_defaults = {Message: ["type"], Presence: ["priority"]} 

143 defaults = known_defaults.get(stanza_class, []) 

144 for interface in defaults: 

145 stanza[interface] = stanza[interface] 

146 stanza2[interface] = stanza2[interface] 

147 # Can really only automatically add defaults for top 

148 # level attribute values. Anything else must be accounted 

149 # for in the provided XML string. 

150 if interface not in xml.attrib: 

151 if interface in default_stanza.xml.attrib: 

152 value = default_stanza.xml.attrib[interface] 

153 xml.attrib[interface] = value 

154 

155 values = stanza2.values 

156 stanza3 = stanza_class() 

157 stanza3.values = values 

158 

159 debug = "Three methods for creating stanzas do not match.\n" 

160 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

161 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

162 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

163 debug += ( 

164 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n" 

165 ) 

166 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) 

167 else: 

168 debug = "Two methods for creating stanzas do not match.\n" 

169 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

170 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

171 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

172 result = self.compare(xml, stanza.xml, stanza2.xml) 

173 stanza_class.namespace = old_ns 

174 

175 if XML_DIFF_PRESENT and not result: 

176 debug += str( 

177 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml)) 

178 ) 

179 if use_values: 

180 debug += str( 

181 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml)) 

182 ) 

183 self.assertTrue(result, debug) 

184 

185 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None: 

186 self.wait_for_send_queue() 

187 sent = self.xmpp.socket.next_sent(timeout=timeout) 

188 if sent is None: 

189 return None 

190 xml = self.parse_xml(sent) 

191 self.fix_namespaces(xml, "jabber:component:accept") 

192 sent = self.xmpp._build_stanza(xml, "jabber:component:accept") 

193 return sent 

194 

195 

196class SlidgeTest(SlixTestPlus): 

197 plugin: types.ModuleType | dict 

198 

199 class Config: 

200 jid = "aim.shakespeare.lit" 

201 secret = "test" 

202 server = "shakespeare.lit" 

203 port = 5222 

204 upload_service = "upload.test" 

205 home_dir = Path(tempfile.mkdtemp()) 

206 user_jid_validator = ".*" 

207 admins: ClassVar[list[str]] = [] 

208 upload_requester = None 

209 ignore_delay_threshold = 300 

210 

211 @classmethod 

212 def setUpClass(cls) -> None: 

213 for k, v in vars(cls.Config).items(): 

214 setattr(config, k.upper(), v) 

215 if hasattr(cls, "plugin"): 

216 subclasses = SubclassableOnce._SubclassableOnce__subclasses 

217 subclasses.update( 

218 { 

219 BaseGateway: find_subclass(cls.plugin, BaseGateway), 

220 BaseSession: find_subclass(cls.plugin, BaseSession), 

221 LegacyRoster: find_subclass(cls.plugin, LegacyRoster, base_ok=True), 

222 LegacyContact: find_subclass( 

223 cls.plugin, LegacyContact, base_ok=True 

224 ), 

225 LegacyMUC: find_subclass(cls.plugin, LegacyMUC, base_ok=True), 

226 LegacyBookmarks: find_subclass( 

227 cls.plugin, LegacyBookmarks, base_ok=True 

228 ), 

229 } 

230 ) 

231 for subclass in subclasses.values(): 

232 if subclass is not None: 

233 subclass.__abstractmethods__ = set() 

234 

235 def setUp(self) -> None: 

236 # workaround for duplicate output of sql alchemy's log, cf 

237 # https://stackoverflow.com/a/76498428/5902284 

238 from sqlalchemy import log as sqlalchemy_log 

239 from sqlalchemy.pool import StaticPool 

240 

241 sqlalchemy_log._add_default_handler = lambda x: None 

242 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:") 

243 engine = self.db_engine = create_engine( 

244 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None 

245 ) 

246 Base.metadata.create_all(engine) 

247 BaseGateway.store = SlidgeStore(engine) 

248 BaseGateway._test_mode = True 

249 

250 self.xmpp = BaseGateway.get_self_or_unique_subclass()() 

251 

252 self.xmpp.TEST_MODE = True 

253 avatar_cache.store = self.xmpp.store.avatars 

254 avatar_cache.set_dir(Path(tempfile.mkdtemp())) 

255 self.xmpp._always_send_everything = True 

256 engine.echo = True 

257 

258 self.xmpp.connection_made(TestTransport(self.xmpp)) 

259 self.xmpp.session_bind_event.set() 

260 # Remove unique ID prefix to make it easier to test 

261 self.xmpp._id_prefix = "" 

262 self.xmpp.default_lang = None 

263 self.xmpp.peer_default_lang = None 

264 

265 def new_id() -> str: 

266 self.xmpp._id += 1 

267 return str(self.xmpp._id) 

268 

269 self.xmpp._id = 0 

270 self.xmpp.new_id = new_id 

271 

272 # Must have the stream header ready for xmpp.process() to work. 

273 header = self.xmpp.stream_header 

274 

275 self.xmpp.data_received(header) 

276 self.wait_for_send_queue() 

277 

278 self.xmpp.socket.next_sent() 

279 self.xmpp.socket.next_sent() 

280 

281 # Some plugins require messages to have ID values. Set 

282 # this to True in tests related to those plugins. 

283 self.xmpp.use_message_ids = False 

284 self.xmpp.use_presence_ids = False 

285 Error.namespace = "jabber:component:accept" 

286 

287 def _add_slidge_user( 

288 self, 

289 jid: str | JID = "romeo@shakespeare.lit", 

290 legacy_module_data: dict | None = None, 

291 preferences: dict | None = None, 

292 ) -> None: 

293 with self.xmpp.store.session() as orm: 

294 user = GatewayUser( 

295 jid=JID(jid), 

296 legacy_module_data=legacy_module_data or {}, 

297 preferences=preferences 

298 or {"sync_avatar": False, "sync_presence": False}, 

299 ) 

300 orm.add(user) 

301 orm.commit() 

302 self.run_coro( 

303 self.xmpp._BaseGateway__dispatcher._on_user_register(Iq(sfrom=JID(jid))) 

304 ) 

305 welcome = self.next_sent() 

306 assert welcome["body"] 

307 stanza = self.next_sent() 

308 assert "logging in" in stanza["status"].lower(), stanza 

309 stanza = self.next_sent() 

310 assert "syncing contacts" in stanza["status"].lower(), stanza 

311 if self.xmpp.GROUPS: 

312 stanza = self.next_sent() 

313 assert "syncing groups" in stanza["status"].lower(), stanza 

314 probe = self.next_sent() 

315 assert probe.get_type() == "probe" 

316 stanza = self.next_sent() 

317 assert stanza["status"].lower() 

318 

319 def user_session(self, jid: str | JID = "romeo@shakespeare.lit") -> BaseSession: 

320 return BaseSession.get_self_or_unique_subclass().from_jid(JID(jid)) 

321 

322 def get_joined_muc( 

323 self, 

324 legacy_id: str | int, 

325 user_jid: str | JID = "romeo@shakespeare.lit", 

326 resource: str = "gajim", 

327 ) -> LegacyMUC: 

328 muc: LegacyMUC = self.run_coro( 

329 self.user_session(user_jid).bookmarks.by_legacy_id(legacy_id) 

330 ) 

331 muc.add_user_resource(resource) 

332 return muc 

333 

334 def tearDown(self) -> None: 

335 self.db_engine.echo = False 

336 super().tearDown() 

337 Base.metadata.drop_all(self.xmpp.store._engine) 

338 self.db_engine.dispose() 

339 _sessions.clear() 

340 

341 def setup_logged_session(self, n_contacts: int = 0) -> None: 

342 with self.xmpp.store.session() as orm: 

343 user = GatewayUser( 

344 jid=JID("romeo@montague.lit/gajim").bare, 

345 legacy_module_data={"username": "romeo", "city": ""}, 

346 preferences={"sync_avatar": True, "sync_presence": True}, 

347 ) 

348 orm.add(user) 

349 orm.commit() 

350 

351 with self.xmpp.store.session() as session: 

352 session.execute(delete(Contact)) 

353 session.commit() 

354 

355 self.run_coro( 

356 self.xmpp._BaseGateway__dispatcher._on_user_register( 

357 Iq(sfrom="romeo@montague.lit/gajim") 

358 ) 

359 ) 

360 welcome = self.next_sent() 

361 assert welcome["body"], welcome 

362 stanza = self.next_sent() 

363 assert "logging in" in stanza["status"].lower(), stanza 

364 stanza = self.next_sent() 

365 assert "syncing contacts" in stanza["status"].lower(), stanza 

366 if BaseGateway.get_self_or_unique_subclass().GROUPS: 

367 stanza = self.next_sent() 

368 assert "syncing groups" in stanza["status"].lower(), stanza 

369 probe = self.next_sent() 

370 assert probe.get_type() == "probe" 

371 stanza = self.next_sent() 

372 assert "yup" in stanza["status"].lower(), stanza 

373 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid( 

374 JID("romeo@montague.lit") 

375 ) 

376 

377 self.juliet: LegacyContact = self.run_coro( 

378 self.romeo.contacts.by_legacy_id("juliet") 

379 ) 

380 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room")) 

381 self.first_witch: LegacyParticipant = self.run_coro( 

382 self.room.get_participant("firstwitch") 

383 ) 

384 self.send( # language=XML 

385 """ 

386 <iq type="get" 

387 to="romeo@montague.lit" 

388 id="1" 

389 from="aim.shakespeare.lit"> 

390 <pubsub xmlns="http://jabber.org/protocol/pubsub"> 

391 <items node="urn:xmpp:avatar:metadata" /> 

392 </pubsub> 

393 </iq> 

394 """ 

395 ) 

396 

397 @classmethod 

398 def tearDownClass(cls) -> None: 

399 reset_subclasses() 

400 

401 

402def format_stanza(stanza: ElementBase) -> str: 

403 return highlight( 

404 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:]) 

405 ) 

406 

407 

408def find_subclass(o, parent, base_ok: bool = False): # noqa 

409 try: 

410 vals = vars(o).values() 

411 except TypeError: 

412 vals = o.values() 

413 for x in vals: 

414 try: 

415 if issubclass(x, parent) and x is not parent: 

416 return x 

417 except TypeError: 

418 pass 

419 else: 

420 if base_ok: 

421 return parent 

422 else: 

423 raise RuntimeError 

424 

425 

426def reset_subclasses() -> None: 

427 """ 

428 Reset registered subclasses between test classes. 

429 

430 Needed because these classes are meant to only be subclassed once and raise 

431 exceptions otherwise. 

432 """ 

433 BaseSession.reset_subclass() 

434 BaseGateway.reset_subclass() 

435 LegacyRoster.reset_subclass() 

436 LegacyContact.reset_subclass() 

437 LegacyMUC.reset_subclass() 

438 LegacyBookmarks.reset_subclass() 

439 LegacyParticipant.reset_subclass() 

440 # reset_commands() 

441 

442 

443def reset_commands() -> None: 

444 Command.subclasses = [ 

445 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core") 

446 ]