Coverage for slidge / util / test.py: 91%
234 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1# type:ignore
2import os
3import tempfile
4import types
5from pathlib import Path
6from typing import ClassVar
7from xml.dom.minidom import parseString
9try:
10 import xmldiff.main
12 XML_DIFF_PRESENT = True
13except ImportError:
14 xmldiff = None
15 XML_DIFF_PRESENT = False
17from slixmpp import (
18 JID,
19 ElementBase,
20 Iq,
21 MatcherId,
22 MatchXMLMask,
23 MatchXPath,
24 Message,
25 Presence,
26 StanzaPath,
27)
28from slixmpp.stanza.error import Error
29from slixmpp.test import SlixTest, TestTransport
30from slixmpp.xmlstream import highlight, tostring
31from slixmpp.xmlstream.matcher import MatchIDSender
32from sqlalchemy import create_engine, delete
34from slidge import (
35 BaseGateway,
36 BaseSession,
37 LegacyBookmarks,
38 LegacyContact,
39 LegacyMUC,
40 LegacyParticipant,
41 LegacyRoster,
42)
44from ..command import Command
45from ..core import config
46from ..core.session import _sessions
47from ..db import SlidgeStore
48from ..db.avatar import avatar_cache
49from ..db.meta import Base
50from ..db.models import Contact, GatewayUser
51from ..util import SubclassableOnce
54class SlixTestPlus(SlixTest):
55 def setUp(self) -> None:
56 super().setUp()
57 Error.namespace = "jabber:component:accept"
59 def check(
60 self,
61 stanza, # noqa
62 criteria, # noqa
63 method: str = "exact",
64 defaults=None, # noqa
65 use_values: bool = True,
66 ) -> None:
67 """
68 Create and compare several stanza objects to a correct XML string.
70 If use_values is False, tests using stanza.values will not be used.
72 Some stanzas provide default values for some interfaces, but
73 these defaults can be problematic for testing since they can easily
74 be forgotten when supplying the XML string. A list of interfaces that
75 use defaults may be provided and the generated stanzas will use the
76 default values for those interfaces if needed.
78 However, correcting the supplied XML is not possible for interfaces
79 that add or remove XML elements. Only interfaces that map to XML
80 attributes may be set using the defaults parameter. The supplied XML
81 must take into account any extra elements that are included by default.
83 Arguments:
84 stanza -- The stanza object to test.
85 criteria -- An expression the stanza must match against.
86 method -- The type of matching to use; one of:
87 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
88 Defaults to the value of self.match_method.
89 defaults -- A list of stanza interfaces that have default
90 values. These interfaces will be set to their
91 defaults for the given and generated stanzas to
92 prevent unexpected test failures.
93 use_values -- Indicates if testing using stanza.values should
94 be used. Defaults to True.
95 """
96 if method is None and hasattr(self, "match_method"):
97 method = getattr(self, "match_method")
99 if method != "exact":
100 matchers = {
101 "stanzapath": StanzaPath,
102 "xpath": MatchXPath,
103 "mask": MatchXMLMask,
104 "idsender": MatchIDSender,
105 "id": MatcherId,
106 }
107 Matcher = matchers.get(method, None)
108 if Matcher is None:
109 raise ValueError("Unknown matching method.")
110 test = Matcher(criteria)
111 self.assertTrue(
112 test.match(stanza),
113 f"Stanza did not match using {method} method:\n"
114 + f"Criteria:\n{criteria!s}\n"
115 + f"Stanza:\n{stanza!s}",
116 )
117 else:
118 stanza_class = stanza.__class__
119 # Hack to preserve namespaces instead of having jabber:client
120 # everywhere.
121 old_ns = stanza_class.namespace
122 stanza_class.namespace = stanza.namespace
123 if not isinstance(criteria, ElementBase):
124 xml = self.parse_xml(criteria)
125 else:
126 xml = criteria.xml
128 # Ensure that top level namespaces are used, even if they
129 # were not provided.
130 self.fix_namespaces(stanza.xml)
131 self.fix_namespaces(xml)
133 stanza2 = stanza_class(xml=xml)
135 if use_values:
136 # Using stanza.values will add XML for any interface that
137 # has a default value. We need to set those defaults on
138 # the existing stanzas and XML so that they will compare
139 # correctly.
140 default_stanza = stanza_class()
141 if defaults is None:
142 known_defaults = {Message: ["type"], Presence: ["priority"]}
143 defaults = known_defaults.get(stanza_class, [])
144 for interface in defaults:
145 stanza[interface] = stanza[interface]
146 stanza2[interface] = stanza2[interface]
147 # Can really only automatically add defaults for top
148 # level attribute values. Anything else must be accounted
149 # for in the provided XML string.
150 if interface not in xml.attrib:
151 if interface in default_stanza.xml.attrib:
152 value = default_stanza.xml.attrib[interface]
153 xml.attrib[interface] = value
155 values = stanza2.values
156 stanza3 = stanza_class()
157 stanza3.values = values
159 debug = "Three methods for creating stanzas do not match.\n"
160 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
161 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
162 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
163 debug += (
164 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n"
165 )
166 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
167 else:
168 debug = "Two methods for creating stanzas do not match.\n"
169 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
170 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
171 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
172 result = self.compare(xml, stanza.xml, stanza2.xml)
173 stanza_class.namespace = old_ns
175 if XML_DIFF_PRESENT and not result:
176 debug += str(
177 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
178 )
179 if use_values:
180 debug += str(
181 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
182 )
183 self.assertTrue(result, debug)
185 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None:
186 self.wait_for_send_queue()
187 sent = self.xmpp.socket.next_sent(timeout=timeout)
188 if sent is None:
189 return None
190 xml = self.parse_xml(sent)
191 self.fix_namespaces(xml, "jabber:component:accept")
192 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
193 return sent
196class SlidgeTest(SlixTestPlus):
197 plugin: types.ModuleType | dict
199 class Config:
200 jid = "aim.shakespeare.lit"
201 secret = "test"
202 server = "shakespeare.lit"
203 port = 5222
204 upload_service = "upload.test"
205 home_dir = Path(tempfile.mkdtemp())
206 user_jid_validator = ".*"
207 admins: ClassVar[list[str]] = []
208 upload_requester = None
209 ignore_delay_threshold = 300
211 @classmethod
212 def setUpClass(cls) -> None:
213 for k, v in vars(cls.Config).items():
214 setattr(config, k.upper(), v)
215 if hasattr(cls, "plugin"):
216 subclasses = SubclassableOnce._SubclassableOnce__subclasses
217 subclasses.update(
218 {
219 BaseGateway: find_subclass(cls.plugin, BaseGateway),
220 BaseSession: find_subclass(cls.plugin, BaseSession),
221 LegacyRoster: find_subclass(cls.plugin, LegacyRoster, base_ok=True),
222 LegacyContact: find_subclass(
223 cls.plugin, LegacyContact, base_ok=True
224 ),
225 LegacyMUC: find_subclass(cls.plugin, LegacyMUC, base_ok=True),
226 LegacyBookmarks: find_subclass(
227 cls.plugin, LegacyBookmarks, base_ok=True
228 ),
229 }
230 )
231 for subclass in subclasses.values():
232 if subclass is not None:
233 subclass.__abstractmethods__ = set()
235 def setUp(self) -> None:
236 # workaround for duplicate output of sql alchemy's log, cf
237 # https://stackoverflow.com/a/76498428/5902284
238 from sqlalchemy import log as sqlalchemy_log
239 from sqlalchemy.pool import StaticPool
241 sqlalchemy_log._add_default_handler = lambda x: None
242 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:")
243 engine = self.db_engine = create_engine(
244 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None
245 )
246 Base.metadata.create_all(engine)
247 BaseGateway.store = SlidgeStore(engine)
248 BaseGateway._test_mode = True
250 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
252 self.xmpp.TEST_MODE = True
253 avatar_cache.store = self.xmpp.store.avatars
254 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
255 self.xmpp._always_send_everything = True
256 engine.echo = True
258 self.xmpp.connection_made(TestTransport(self.xmpp))
259 self.xmpp.session_bind_event.set()
260 # Remove unique ID prefix to make it easier to test
261 self.xmpp._id_prefix = ""
262 self.xmpp.default_lang = None
263 self.xmpp.peer_default_lang = None
265 def new_id() -> str:
266 self.xmpp._id += 1
267 return str(self.xmpp._id)
269 self.xmpp._id = 0
270 self.xmpp.new_id = new_id
272 # Must have the stream header ready for xmpp.process() to work.
273 header = self.xmpp.stream_header
275 self.xmpp.data_received(header)
276 self.wait_for_send_queue()
278 self.xmpp.socket.next_sent()
279 self.xmpp.socket.next_sent()
281 # Some plugins require messages to have ID values. Set
282 # this to True in tests related to those plugins.
283 self.xmpp.use_message_ids = False
284 self.xmpp.use_presence_ids = False
285 Error.namespace = "jabber:component:accept"
287 def _add_slidge_user(
288 self,
289 jid: str | JID = "romeo@shakespeare.lit",
290 legacy_module_data: dict | None = None,
291 preferences: dict | None = None,
292 ) -> None:
293 with self.xmpp.store.session() as orm:
294 user = GatewayUser(
295 jid=JID(jid),
296 legacy_module_data=legacy_module_data or {},
297 preferences=preferences
298 or {"sync_avatar": False, "sync_presence": False},
299 )
300 orm.add(user)
301 orm.commit()
302 self.run_coro(
303 self.xmpp._BaseGateway__dispatcher._on_user_register(Iq(sfrom=JID(jid)))
304 )
305 welcome = self.next_sent()
306 assert welcome["body"]
307 stanza = self.next_sent()
308 assert "logging in" in stanza["status"].lower(), stanza
309 stanza = self.next_sent()
310 assert "syncing contacts" in stanza["status"].lower(), stanza
311 if self.xmpp.GROUPS:
312 stanza = self.next_sent()
313 assert "syncing groups" in stanza["status"].lower(), stanza
314 probe = self.next_sent()
315 assert probe.get_type() == "probe"
316 stanza = self.next_sent()
317 assert stanza["status"].lower()
319 def user_session(self, jid: str | JID = "romeo@shakespeare.lit") -> BaseSession:
320 return BaseSession.get_self_or_unique_subclass().from_jid(JID(jid))
322 def get_joined_muc(
323 self,
324 legacy_id: str | int,
325 user_jid: str | JID = "romeo@shakespeare.lit",
326 resource: str = "gajim",
327 ) -> LegacyMUC:
328 muc: LegacyMUC = self.run_coro(
329 self.user_session(user_jid).bookmarks.by_legacy_id(legacy_id)
330 )
331 muc.add_user_resource(resource)
332 return muc
334 def tearDown(self) -> None:
335 self.db_engine.echo = False
336 super().tearDown()
337 Base.metadata.drop_all(self.xmpp.store._engine)
338 self.db_engine.dispose()
339 _sessions.clear()
341 def setup_logged_session(self, n_contacts: int = 0) -> None:
342 with self.xmpp.store.session() as orm:
343 user = GatewayUser(
344 jid=JID("romeo@montague.lit/gajim").bare,
345 legacy_module_data={"username": "romeo", "city": ""},
346 preferences={"sync_avatar": True, "sync_presence": True},
347 )
348 orm.add(user)
349 orm.commit()
351 with self.xmpp.store.session() as session:
352 session.execute(delete(Contact))
353 session.commit()
355 self.run_coro(
356 self.xmpp._BaseGateway__dispatcher._on_user_register(
357 Iq(sfrom="romeo@montague.lit/gajim")
358 )
359 )
360 welcome = self.next_sent()
361 assert welcome["body"], welcome
362 stanza = self.next_sent()
363 assert "logging in" in stanza["status"].lower(), stanza
364 stanza = self.next_sent()
365 assert "syncing contacts" in stanza["status"].lower(), stanza
366 if BaseGateway.get_self_or_unique_subclass().GROUPS:
367 stanza = self.next_sent()
368 assert "syncing groups" in stanza["status"].lower(), stanza
369 probe = self.next_sent()
370 assert probe.get_type() == "probe"
371 stanza = self.next_sent()
372 assert "yup" in stanza["status"].lower(), stanza
373 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
374 JID("romeo@montague.lit")
375 )
377 self.juliet: LegacyContact = self.run_coro(
378 self.romeo.contacts.by_legacy_id("juliet")
379 )
380 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
381 self.first_witch: LegacyParticipant = self.run_coro(
382 self.room.get_participant("firstwitch")
383 )
384 self.send( # language=XML
385 """
386 <iq type="get"
387 to="romeo@montague.lit"
388 id="1"
389 from="aim.shakespeare.lit">
390 <pubsub xmlns="http://jabber.org/protocol/pubsub">
391 <items node="urn:xmpp:avatar:metadata" />
392 </pubsub>
393 </iq>
394 """
395 )
397 @classmethod
398 def tearDownClass(cls) -> None:
399 reset_subclasses()
402def format_stanza(stanza: ElementBase) -> str:
403 return highlight(
404 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
405 )
408def find_subclass(o, parent, base_ok: bool = False): # noqa
409 try:
410 vals = vars(o).values()
411 except TypeError:
412 vals = o.values()
413 for x in vals:
414 try:
415 if issubclass(x, parent) and x is not parent:
416 return x
417 except TypeError:
418 pass
419 else:
420 if base_ok:
421 return parent
422 else:
423 raise RuntimeError
426def reset_subclasses() -> None:
427 """
428 Reset registered subclasses between test classes.
430 Needed because these classes are meant to only be subclassed once and raise
431 exceptions otherwise.
432 """
433 BaseSession.reset_subclass()
434 BaseGateway.reset_subclass()
435 LegacyRoster.reset_subclass()
436 LegacyContact.reset_subclass()
437 LegacyMUC.reset_subclass()
438 LegacyBookmarks.reset_subclass()
439 LegacyParticipant.reset_subclass()
440 # reset_commands()
443def reset_commands() -> None:
444 Command.subclasses = [
445 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
446 ]