Coverage for slidge / util / test.py: 90%
211 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
1# type:ignore
2import os
3import tempfile
4import types
5from pathlib import Path
6from xml.dom.minidom import parseString
8try:
9 import xmldiff.main
11 XML_DIFF_PRESENT = True
12except ImportError:
13 xmldiff = None
14 XML_DIFF_PRESENT = False
16from slixmpp import (
17 JID,
18 ElementBase,
19 Iq,
20 MatcherId,
21 MatchXMLMask,
22 MatchXPath,
23 Message,
24 Presence,
25 StanzaPath,
26)
27from slixmpp.stanza.error import Error
28from slixmpp.test import SlixTest, TestTransport
29from slixmpp.xmlstream import highlight, tostring
30from slixmpp.xmlstream.matcher import MatchIDSender
31from sqlalchemy import create_engine, delete
33from slidge import (
34 BaseGateway,
35 BaseSession,
36 LegacyBookmarks,
37 LegacyContact,
38 LegacyMUC,
39 LegacyParticipant,
40 LegacyRoster,
41)
43from ..command import Command
44from ..core import config
45from ..core.session import _sessions
46from ..db import SlidgeStore
47from ..db.avatar import avatar_cache
48from ..db.meta import Base
49from ..db.models import Contact, GatewayUser
52class SlixTestPlus(SlixTest):
53 def setUp(self) -> None:
54 super().setUp()
55 Error.namespace = "jabber:component:accept"
57 def check(
58 self,
59 stanza,
60 criteria,
61 method: str = "exact",
62 defaults=None,
63 use_values: bool = True,
64 ):
65 """
66 Create and compare several stanza objects to a correct XML string.
68 If use_values is False, tests using stanza.values will not be used.
70 Some stanzas provide default values for some interfaces, but
71 these defaults can be problematic for testing since they can easily
72 be forgotten when supplying the XML string. A list of interfaces that
73 use defaults may be provided and the generated stanzas will use the
74 default values for those interfaces if needed.
76 However, correcting the supplied XML is not possible for interfaces
77 that add or remove XML elements. Only interfaces that map to XML
78 attributes may be set using the defaults parameter. The supplied XML
79 must take into account any extra elements that are included by default.
81 Arguments:
82 stanza -- The stanza object to test.
83 criteria -- An expression the stanza must match against.
84 method -- The type of matching to use; one of:
85 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
86 Defaults to the value of self.match_method.
87 defaults -- A list of stanza interfaces that have default
88 values. These interfaces will be set to their
89 defaults for the given and generated stanzas to
90 prevent unexpected test failures.
91 use_values -- Indicates if testing using stanza.values should
92 be used. Defaults to True.
93 """
94 if method is None and hasattr(self, "match_method"):
95 method = getattr(self, "match_method")
97 if method != "exact":
98 matchers = {
99 "stanzapath": StanzaPath,
100 "xpath": MatchXPath,
101 "mask": MatchXMLMask,
102 "idsender": MatchIDSender,
103 "id": MatcherId,
104 }
105 Matcher = matchers.get(method, None)
106 if Matcher is None:
107 raise ValueError("Unknown matching method.")
108 test = Matcher(criteria)
109 self.assertTrue(
110 test.match(stanza),
111 f"Stanza did not match using {method} method:\n"
112 + f"Criteria:\n{str(criteria)}\n"
113 + f"Stanza:\n{str(stanza)}",
114 )
115 else:
116 stanza_class = stanza.__class__
117 # Hack to preserve namespaces instead of having jabber:client
118 # everywhere.
119 old_ns = stanza_class.namespace
120 stanza_class.namespace = stanza.namespace
121 if not isinstance(criteria, ElementBase):
122 xml = self.parse_xml(criteria)
123 else:
124 xml = criteria.xml
126 # Ensure that top level namespaces are used, even if they
127 # were not provided.
128 self.fix_namespaces(stanza.xml)
129 self.fix_namespaces(xml)
131 stanza2 = stanza_class(xml=xml)
133 if use_values:
134 # Using stanza.values will add XML for any interface that
135 # has a default value. We need to set those defaults on
136 # the existing stanzas and XML so that they will compare
137 # correctly.
138 default_stanza = stanza_class()
139 if defaults is None:
140 known_defaults = {Message: ["type"], Presence: ["priority"]}
141 defaults = known_defaults.get(stanza_class, [])
142 for interface in defaults:
143 stanza[interface] = stanza[interface]
144 stanza2[interface] = stanza2[interface]
145 # Can really only automatically add defaults for top
146 # level attribute values. Anything else must be accounted
147 # for in the provided XML string.
148 if interface not in xml.attrib:
149 if interface in default_stanza.xml.attrib:
150 value = default_stanza.xml.attrib[interface]
151 xml.attrib[interface] = value
153 values = stanza2.values
154 stanza3 = stanza_class()
155 stanza3.values = values
157 debug = "Three methods for creating stanzas do not match.\n"
158 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
159 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
160 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
161 debug += (
162 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n"
163 )
164 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
165 else:
166 debug = "Two methods for creating stanzas do not match.\n"
167 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
168 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
169 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
170 result = self.compare(xml, stanza.xml, stanza2.xml)
171 stanza_class.namespace = old_ns
173 if XML_DIFF_PRESENT and not result:
174 debug += str(
175 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
176 )
177 if use_values:
178 debug += str(
179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
180 )
181 self.assertTrue(result, debug)
183 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None:
184 self.wait_for_send_queue()
185 sent = self.xmpp.socket.next_sent(timeout=timeout)
186 if sent is None:
187 return None
188 xml = self.parse_xml(sent)
189 self.fix_namespaces(xml, "jabber:component:accept")
190 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
191 return sent
194class SlidgeTest(SlixTestPlus):
195 plugin: types.ModuleType | dict
197 class Config:
198 jid = "aim.shakespeare.lit"
199 secret = "test"
200 server = "shakespeare.lit"
201 port = 5222
202 upload_service = "upload.test"
203 home_dir = Path(tempfile.mkdtemp())
204 user_jid_validator = ".*"
205 admins: list[str] = []
206 upload_requester = None
207 ignore_delay_threshold = 300
209 @classmethod
210 def setUpClass(cls) -> None:
211 for k, v in vars(cls.Config).items():
212 setattr(config, k.upper(), v)
214 def setUp(self):
215 if hasattr(self, "plugin"):
216 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway)
217 BaseSession._subclass = find_subclass(self.plugin, BaseSession)
218 LegacyRoster._subclass = find_subclass(
219 self.plugin, LegacyRoster, base_ok=True
220 )
221 LegacyContact._subclass = find_subclass(
222 self.plugin, LegacyContact, base_ok=True
223 )
224 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True)
225 LegacyBookmarks._subclass = find_subclass(
226 self.plugin, LegacyBookmarks, base_ok=True
227 )
229 # workaround for duplicate output of sql alchemy's log, cf
230 # https://stackoverflow.com/a/76498428/5902284
231 from sqlalchemy import log as sqlalchemy_log
232 from sqlalchemy.pool import StaticPool
234 sqlalchemy_log._add_default_handler = lambda x: None
235 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:")
236 engine = self.db_engine = create_engine(
237 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None
238 )
239 Base.metadata.create_all(engine)
240 BaseGateway.store = SlidgeStore(engine)
241 BaseGateway._test_mode = True
242 try:
243 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
244 except Exception:
245 raise
246 self.xmpp.TEST_MODE = True
247 avatar_cache.store = self.xmpp.store.avatars
248 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
249 self.xmpp._always_send_everything = True
250 engine.echo = True
252 self.xmpp.connection_made(TestTransport(self.xmpp))
253 self.xmpp.session_bind_event.set()
254 # Remove unique ID prefix to make it easier to test
255 self.xmpp._id_prefix = ""
256 self.xmpp.default_lang = None
257 self.xmpp.peer_default_lang = None
259 def new_id():
260 self.xmpp._id += 1
261 return str(self.xmpp._id)
263 self.xmpp._id = 0
264 self.xmpp.new_id = new_id
266 # Must have the stream header ready for xmpp.process() to work.
267 header = self.xmpp.stream_header
269 self.xmpp.data_received(header)
270 self.wait_for_send_queue()
272 self.xmpp.socket.next_sent()
273 self.xmpp.socket.next_sent()
275 # Some plugins require messages to have ID values. Set
276 # this to True in tests related to those plugins.
277 self.xmpp.use_message_ids = False
278 self.xmpp.use_presence_ids = False
279 Error.namespace = "jabber:component:accept"
281 def tearDown(self) -> None:
282 self.db_engine.echo = False
283 super().tearDown()
284 Base.metadata.drop_all(self.xmpp.store._engine)
285 self.db_engine.dispose()
286 _sessions.clear()
288 def setup_logged_session(self, n_contacts: int = 0) -> None:
289 with self.xmpp.store.session() as orm:
290 user = GatewayUser(
291 jid=JID("romeo@montague.lit/gajim").bare,
292 legacy_module_data={"username": "romeo", "city": ""},
293 preferences={"sync_avatar": True, "sync_presence": True},
294 )
295 orm.add(user)
296 orm.commit()
298 with self.xmpp.store.session() as session:
299 session.execute(delete(Contact))
300 session.commit()
302 self.run_coro(
303 self.xmpp._BaseGateway__dispatcher._on_user_register(
304 Iq(sfrom="romeo@montague.lit/gajim")
305 )
306 )
307 welcome = self.next_sent()
308 assert welcome["body"], welcome
309 stanza = self.next_sent()
310 assert "logging in" in stanza["status"].lower(), stanza
311 stanza = self.next_sent()
312 assert "syncing contacts" in stanza["status"].lower(), stanza
313 if BaseGateway.get_self_or_unique_subclass().GROUPS:
314 stanza = self.next_sent()
315 assert "syncing groups" in stanza["status"].lower(), stanza
316 probe = self.next_sent()
317 assert probe.get_type() == "probe"
318 stanza = self.next_sent()
319 assert "yup" in stanza["status"].lower(), stanza
320 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
321 JID("romeo@montague.lit")
322 )
324 self.juliet: LegacyContact = self.run_coro(
325 self.romeo.contacts.by_legacy_id("juliet")
326 )
327 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
328 self.first_witch: LegacyParticipant = self.run_coro(
329 self.room.get_participant("firstwitch")
330 )
331 self.send( # language=XML
332 """
333 <iq type="get"
334 to="romeo@montague.lit"
335 id="1"
336 from="aim.shakespeare.lit">
337 <pubsub xmlns="http://jabber.org/protocol/pubsub">
338 <items node="urn:xmpp:avatar:metadata" />
339 </pubsub>
340 </iq>
341 """
342 )
344 @classmethod
345 def tearDownClass(cls) -> None:
346 reset_subclasses()
349def format_stanza(stanza):
350 return highlight(
351 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
352 )
355def find_subclass(o, parent, base_ok: bool = False):
356 try:
357 vals = vars(o).values()
358 except TypeError:
359 vals = o.values()
360 for x in vals:
361 try:
362 if issubclass(x, parent) and x is not parent:
363 return x
364 except TypeError:
365 pass
366 else:
367 if base_ok:
368 return parent
369 else:
370 raise RuntimeError
373def reset_subclasses() -> None:
374 """
375 Reset registered subclasses between test classes.
377 Needed because these classes are meant to only be subclassed once and raise
378 exceptions otherwise.
379 """
380 BaseSession.reset_subclass()
381 BaseGateway.reset_subclass()
382 LegacyRoster.reset_subclass()
383 LegacyContact.reset_subclass()
384 LegacyMUC.reset_subclass()
385 LegacyBookmarks.reset_subclass()
386 LegacyParticipant.reset_subclass()
387 # reset_commands()
390def reset_commands() -> None:
391 Command.subclasses = [
392 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
393 ]