Coverage for slidge / util / test.py: 90%

211 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1# type:ignore 

2import os 

3import tempfile 

4import types 

5from pathlib import Path 

6from xml.dom.minidom import parseString 

7 

8try: 

9 import xmldiff.main 

10 

11 XML_DIFF_PRESENT = True 

12except ImportError: 

13 xmldiff = None 

14 XML_DIFF_PRESENT = False 

15 

16from slixmpp import ( 

17 JID, 

18 ElementBase, 

19 Iq, 

20 MatcherId, 

21 MatchXMLMask, 

22 MatchXPath, 

23 Message, 

24 Presence, 

25 StanzaPath, 

26) 

27from slixmpp.stanza.error import Error 

28from slixmpp.test import SlixTest, TestTransport 

29from slixmpp.xmlstream import highlight, tostring 

30from slixmpp.xmlstream.matcher import MatchIDSender 

31from sqlalchemy import create_engine, delete 

32 

33from slidge import ( 

34 BaseGateway, 

35 BaseSession, 

36 LegacyBookmarks, 

37 LegacyContact, 

38 LegacyMUC, 

39 LegacyParticipant, 

40 LegacyRoster, 

41) 

42 

43from ..command import Command 

44from ..core import config 

45from ..core.session import _sessions 

46from ..db import SlidgeStore 

47from ..db.avatar import avatar_cache 

48from ..db.meta import Base 

49from ..db.models import Contact, GatewayUser 

50 

51 

52class SlixTestPlus(SlixTest): 

53 def setUp(self) -> None: 

54 super().setUp() 

55 Error.namespace = "jabber:component:accept" 

56 

57 def check( 

58 self, 

59 stanza, 

60 criteria, 

61 method: str = "exact", 

62 defaults=None, 

63 use_values: bool = True, 

64 ): 

65 """ 

66 Create and compare several stanza objects to a correct XML string. 

67 

68 If use_values is False, tests using stanza.values will not be used. 

69 

70 Some stanzas provide default values for some interfaces, but 

71 these defaults can be problematic for testing since they can easily 

72 be forgotten when supplying the XML string. A list of interfaces that 

73 use defaults may be provided and the generated stanzas will use the 

74 default values for those interfaces if needed. 

75 

76 However, correcting the supplied XML is not possible for interfaces 

77 that add or remove XML elements. Only interfaces that map to XML 

78 attributes may be set using the defaults parameter. The supplied XML 

79 must take into account any extra elements that are included by default. 

80 

81 Arguments: 

82 stanza -- The stanza object to test. 

83 criteria -- An expression the stanza must match against. 

84 method -- The type of matching to use; one of: 

85 'exact', 'mask', 'id', 'xpath', and 'stanzapath'. 

86 Defaults to the value of self.match_method. 

87 defaults -- A list of stanza interfaces that have default 

88 values. These interfaces will be set to their 

89 defaults for the given and generated stanzas to 

90 prevent unexpected test failures. 

91 use_values -- Indicates if testing using stanza.values should 

92 be used. Defaults to True. 

93 """ 

94 if method is None and hasattr(self, "match_method"): 

95 method = getattr(self, "match_method") 

96 

97 if method != "exact": 

98 matchers = { 

99 "stanzapath": StanzaPath, 

100 "xpath": MatchXPath, 

101 "mask": MatchXMLMask, 

102 "idsender": MatchIDSender, 

103 "id": MatcherId, 

104 } 

105 Matcher = matchers.get(method, None) 

106 if Matcher is None: 

107 raise ValueError("Unknown matching method.") 

108 test = Matcher(criteria) 

109 self.assertTrue( 

110 test.match(stanza), 

111 f"Stanza did not match using {method} method:\n" 

112 + f"Criteria:\n{str(criteria)}\n" 

113 + f"Stanza:\n{str(stanza)}", 

114 ) 

115 else: 

116 stanza_class = stanza.__class__ 

117 # Hack to preserve namespaces instead of having jabber:client 

118 # everywhere. 

119 old_ns = stanza_class.namespace 

120 stanza_class.namespace = stanza.namespace 

121 if not isinstance(criteria, ElementBase): 

122 xml = self.parse_xml(criteria) 

123 else: 

124 xml = criteria.xml 

125 

126 # Ensure that top level namespaces are used, even if they 

127 # were not provided. 

128 self.fix_namespaces(stanza.xml) 

129 self.fix_namespaces(xml) 

130 

131 stanza2 = stanza_class(xml=xml) 

132 

133 if use_values: 

134 # Using stanza.values will add XML for any interface that 

135 # has a default value. We need to set those defaults on 

136 # the existing stanzas and XML so that they will compare 

137 # correctly. 

138 default_stanza = stanza_class() 

139 if defaults is None: 

140 known_defaults = {Message: ["type"], Presence: ["priority"]} 

141 defaults = known_defaults.get(stanza_class, []) 

142 for interface in defaults: 

143 stanza[interface] = stanza[interface] 

144 stanza2[interface] = stanza2[interface] 

145 # Can really only automatically add defaults for top 

146 # level attribute values. Anything else must be accounted 

147 # for in the provided XML string. 

148 if interface not in xml.attrib: 

149 if interface in default_stanza.xml.attrib: 

150 value = default_stanza.xml.attrib[interface] 

151 xml.attrib[interface] = value 

152 

153 values = stanza2.values 

154 stanza3 = stanza_class() 

155 stanza3.values = values 

156 

157 debug = "Three methods for creating stanzas do not match.\n" 

158 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

159 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

160 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

161 debug += ( 

162 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n" 

163 ) 

164 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) 

165 else: 

166 debug = "Two methods for creating stanzas do not match.\n" 

167 debug += f"Given XML:\n{highlight(tostring(xml))}\n" 

168 debug += f"Given stanza:\n{format_stanza(stanza)}\n" 

169 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n" 

170 result = self.compare(xml, stanza.xml, stanza2.xml) 

171 stanza_class.namespace = old_ns 

172 

173 if XML_DIFF_PRESENT and not result: 

174 debug += str( 

175 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml)) 

176 ) 

177 if use_values: 

178 debug += str( 

179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml)) 

180 ) 

181 self.assertTrue(result, debug) 

182 

183 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None: 

184 self.wait_for_send_queue() 

185 sent = self.xmpp.socket.next_sent(timeout=timeout) 

186 if sent is None: 

187 return None 

188 xml = self.parse_xml(sent) 

189 self.fix_namespaces(xml, "jabber:component:accept") 

190 sent = self.xmpp._build_stanza(xml, "jabber:component:accept") 

191 return sent 

192 

193 

194class SlidgeTest(SlixTestPlus): 

195 plugin: types.ModuleType | dict 

196 

197 class Config: 

198 jid = "aim.shakespeare.lit" 

199 secret = "test" 

200 server = "shakespeare.lit" 

201 port = 5222 

202 upload_service = "upload.test" 

203 home_dir = Path(tempfile.mkdtemp()) 

204 user_jid_validator = ".*" 

205 admins: list[str] = [] 

206 upload_requester = None 

207 ignore_delay_threshold = 300 

208 

209 @classmethod 

210 def setUpClass(cls) -> None: 

211 for k, v in vars(cls.Config).items(): 

212 setattr(config, k.upper(), v) 

213 

214 def setUp(self): 

215 if hasattr(self, "plugin"): 

216 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway) 

217 BaseSession._subclass = find_subclass(self.plugin, BaseSession) 

218 LegacyRoster._subclass = find_subclass( 

219 self.plugin, LegacyRoster, base_ok=True 

220 ) 

221 LegacyContact._subclass = find_subclass( 

222 self.plugin, LegacyContact, base_ok=True 

223 ) 

224 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True) 

225 LegacyBookmarks._subclass = find_subclass( 

226 self.plugin, LegacyBookmarks, base_ok=True 

227 ) 

228 

229 # workaround for duplicate output of sql alchemy's log, cf 

230 # https://stackoverflow.com/a/76498428/5902284 

231 from sqlalchemy import log as sqlalchemy_log 

232 from sqlalchemy.pool import StaticPool 

233 

234 sqlalchemy_log._add_default_handler = lambda x: None 

235 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:") 

236 engine = self.db_engine = create_engine( 

237 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None 

238 ) 

239 Base.metadata.create_all(engine) 

240 BaseGateway.store = SlidgeStore(engine) 

241 BaseGateway._test_mode = True 

242 try: 

243 self.xmpp = BaseGateway.get_self_or_unique_subclass()() 

244 except Exception: 

245 raise 

246 self.xmpp.TEST_MODE = True 

247 avatar_cache.store = self.xmpp.store.avatars 

248 avatar_cache.set_dir(Path(tempfile.mkdtemp())) 

249 self.xmpp._always_send_everything = True 

250 engine.echo = True 

251 

252 self.xmpp.connection_made(TestTransport(self.xmpp)) 

253 self.xmpp.session_bind_event.set() 

254 # Remove unique ID prefix to make it easier to test 

255 self.xmpp._id_prefix = "" 

256 self.xmpp.default_lang = None 

257 self.xmpp.peer_default_lang = None 

258 

259 def new_id(): 

260 self.xmpp._id += 1 

261 return str(self.xmpp._id) 

262 

263 self.xmpp._id = 0 

264 self.xmpp.new_id = new_id 

265 

266 # Must have the stream header ready for xmpp.process() to work. 

267 header = self.xmpp.stream_header 

268 

269 self.xmpp.data_received(header) 

270 self.wait_for_send_queue() 

271 

272 self.xmpp.socket.next_sent() 

273 self.xmpp.socket.next_sent() 

274 

275 # Some plugins require messages to have ID values. Set 

276 # this to True in tests related to those plugins. 

277 self.xmpp.use_message_ids = False 

278 self.xmpp.use_presence_ids = False 

279 Error.namespace = "jabber:component:accept" 

280 

281 def tearDown(self) -> None: 

282 self.db_engine.echo = False 

283 super().tearDown() 

284 Base.metadata.drop_all(self.xmpp.store._engine) 

285 self.db_engine.dispose() 

286 _sessions.clear() 

287 

288 def setup_logged_session(self, n_contacts: int = 0) -> None: 

289 with self.xmpp.store.session() as orm: 

290 user = GatewayUser( 

291 jid=JID("romeo@montague.lit/gajim").bare, 

292 legacy_module_data={"username": "romeo", "city": ""}, 

293 preferences={"sync_avatar": True, "sync_presence": True}, 

294 ) 

295 orm.add(user) 

296 orm.commit() 

297 

298 with self.xmpp.store.session() as session: 

299 session.execute(delete(Contact)) 

300 session.commit() 

301 

302 self.run_coro( 

303 self.xmpp._BaseGateway__dispatcher._on_user_register( 

304 Iq(sfrom="romeo@montague.lit/gajim") 

305 ) 

306 ) 

307 welcome = self.next_sent() 

308 assert welcome["body"], welcome 

309 stanza = self.next_sent() 

310 assert "logging in" in stanza["status"].lower(), stanza 

311 stanza = self.next_sent() 

312 assert "syncing contacts" in stanza["status"].lower(), stanza 

313 if BaseGateway.get_self_or_unique_subclass().GROUPS: 

314 stanza = self.next_sent() 

315 assert "syncing groups" in stanza["status"].lower(), stanza 

316 probe = self.next_sent() 

317 assert probe.get_type() == "probe" 

318 stanza = self.next_sent() 

319 assert "yup" in stanza["status"].lower(), stanza 

320 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid( 

321 JID("romeo@montague.lit") 

322 ) 

323 

324 self.juliet: LegacyContact = self.run_coro( 

325 self.romeo.contacts.by_legacy_id("juliet") 

326 ) 

327 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room")) 

328 self.first_witch: LegacyParticipant = self.run_coro( 

329 self.room.get_participant("firstwitch") 

330 ) 

331 self.send( # language=XML 

332 """ 

333 <iq type="get" 

334 to="romeo@montague.lit" 

335 id="1" 

336 from="aim.shakespeare.lit"> 

337 <pubsub xmlns="http://jabber.org/protocol/pubsub"> 

338 <items node="urn:xmpp:avatar:metadata" /> 

339 </pubsub> 

340 </iq> 

341 """ 

342 ) 

343 

344 @classmethod 

345 def tearDownClass(cls) -> None: 

346 reset_subclasses() 

347 

348 

349def format_stanza(stanza): 

350 return highlight( 

351 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:]) 

352 ) 

353 

354 

355def find_subclass(o, parent, base_ok: bool = False): 

356 try: 

357 vals = vars(o).values() 

358 except TypeError: 

359 vals = o.values() 

360 for x in vals: 

361 try: 

362 if issubclass(x, parent) and x is not parent: 

363 return x 

364 except TypeError: 

365 pass 

366 else: 

367 if base_ok: 

368 return parent 

369 else: 

370 raise RuntimeError 

371 

372 

373def reset_subclasses() -> None: 

374 """ 

375 Reset registered subclasses between test classes. 

376 

377 Needed because these classes are meant to only be subclassed once and raise 

378 exceptions otherwise. 

379 """ 

380 BaseSession.reset_subclass() 

381 BaseGateway.reset_subclass() 

382 LegacyRoster.reset_subclass() 

383 LegacyContact.reset_subclass() 

384 LegacyMUC.reset_subclass() 

385 LegacyBookmarks.reset_subclass() 

386 LegacyParticipant.reset_subclass() 

387 # reset_commands() 

388 

389 

390def reset_commands() -> None: 

391 Command.subclasses = [ 

392 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core") 

393 ]