Coverage for slidge / util / test.py: 91%
233 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-13 22:59 +0000
1# type:ignore
2import os
3import tempfile
4import types
5from pathlib import Path
6from xml.dom.minidom import parseString
8try:
9 import xmldiff.main
11 XML_DIFF_PRESENT = True
12except ImportError:
13 xmldiff = None
14 XML_DIFF_PRESENT = False
16from slixmpp import (
17 JID,
18 ElementBase,
19 Iq,
20 MatcherId,
21 MatchXMLMask,
22 MatchXPath,
23 Message,
24 Presence,
25 StanzaPath,
26)
27from slixmpp.stanza.error import Error
28from slixmpp.test import SlixTest, TestTransport
29from slixmpp.xmlstream import highlight, tostring
30from slixmpp.xmlstream.matcher import MatchIDSender
31from sqlalchemy import create_engine, delete
33from slidge import (
34 BaseGateway,
35 BaseSession,
36 LegacyBookmarks,
37 LegacyContact,
38 LegacyMUC,
39 LegacyParticipant,
40 LegacyRoster,
41)
43from ..command import Command
44from ..core import config
45from ..core.session import _sessions
46from ..db import SlidgeStore
47from ..db.avatar import avatar_cache
48from ..db.meta import Base
49from ..db.models import Contact, GatewayUser
50from ..util import SubclassableOnce
53class SlixTestPlus(SlixTest):
54 def setUp(self) -> None:
55 super().setUp()
56 Error.namespace = "jabber:component:accept"
58 def check(
59 self,
60 stanza,
61 criteria,
62 method: str = "exact",
63 defaults=None,
64 use_values: bool = True,
65 ) -> None:
66 """
67 Create and compare several stanza objects to a correct XML string.
69 If use_values is False, tests using stanza.values will not be used.
71 Some stanzas provide default values for some interfaces, but
72 these defaults can be problematic for testing since they can easily
73 be forgotten when supplying the XML string. A list of interfaces that
74 use defaults may be provided and the generated stanzas will use the
75 default values for those interfaces if needed.
77 However, correcting the supplied XML is not possible for interfaces
78 that add or remove XML elements. Only interfaces that map to XML
79 attributes may be set using the defaults parameter. The supplied XML
80 must take into account any extra elements that are included by default.
82 Arguments:
83 stanza -- The stanza object to test.
84 criteria -- An expression the stanza must match against.
85 method -- The type of matching to use; one of:
86 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
87 Defaults to the value of self.match_method.
88 defaults -- A list of stanza interfaces that have default
89 values. These interfaces will be set to their
90 defaults for the given and generated stanzas to
91 prevent unexpected test failures.
92 use_values -- Indicates if testing using stanza.values should
93 be used. Defaults to True.
94 """
95 if method is None and hasattr(self, "match_method"):
96 method = getattr(self, "match_method")
98 if method != "exact":
99 matchers = {
100 "stanzapath": StanzaPath,
101 "xpath": MatchXPath,
102 "mask": MatchXMLMask,
103 "idsender": MatchIDSender,
104 "id": MatcherId,
105 }
106 Matcher = matchers.get(method, None)
107 if Matcher is None:
108 raise ValueError("Unknown matching method.")
109 test = Matcher(criteria)
110 self.assertTrue(
111 test.match(stanza),
112 f"Stanza did not match using {method} method:\n"
113 + f"Criteria:\n{str(criteria)}\n"
114 + f"Stanza:\n{str(stanza)}",
115 )
116 else:
117 stanza_class = stanza.__class__
118 # Hack to preserve namespaces instead of having jabber:client
119 # everywhere.
120 old_ns = stanza_class.namespace
121 stanza_class.namespace = stanza.namespace
122 if not isinstance(criteria, ElementBase):
123 xml = self.parse_xml(criteria)
124 else:
125 xml = criteria.xml
127 # Ensure that top level namespaces are used, even if they
128 # were not provided.
129 self.fix_namespaces(stanza.xml)
130 self.fix_namespaces(xml)
132 stanza2 = stanza_class(xml=xml)
134 if use_values:
135 # Using stanza.values will add XML for any interface that
136 # has a default value. We need to set those defaults on
137 # the existing stanzas and XML so that they will compare
138 # correctly.
139 default_stanza = stanza_class()
140 if defaults is None:
141 known_defaults = {Message: ["type"], Presence: ["priority"]}
142 defaults = known_defaults.get(stanza_class, [])
143 for interface in defaults:
144 stanza[interface] = stanza[interface]
145 stanza2[interface] = stanza2[interface]
146 # Can really only automatically add defaults for top
147 # level attribute values. Anything else must be accounted
148 # for in the provided XML string.
149 if interface not in xml.attrib:
150 if interface in default_stanza.xml.attrib:
151 value = default_stanza.xml.attrib[interface]
152 xml.attrib[interface] = value
154 values = stanza2.values
155 stanza3 = stanza_class()
156 stanza3.values = values
158 debug = "Three methods for creating stanzas do not match.\n"
159 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
160 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
161 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
162 debug += (
163 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n"
164 )
165 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
166 else:
167 debug = "Two methods for creating stanzas do not match.\n"
168 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
169 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
170 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
171 result = self.compare(xml, stanza.xml, stanza2.xml)
172 stanza_class.namespace = old_ns
174 if XML_DIFF_PRESENT and not result:
175 debug += str(
176 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
177 )
178 if use_values:
179 debug += str(
180 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
181 )
182 self.assertTrue(result, debug)
184 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None:
185 self.wait_for_send_queue()
186 sent = self.xmpp.socket.next_sent(timeout=timeout)
187 if sent is None:
188 return None
189 xml = self.parse_xml(sent)
190 self.fix_namespaces(xml, "jabber:component:accept")
191 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
192 return sent
195class SlidgeTest(SlixTestPlus):
196 plugin: types.ModuleType | dict
198 class Config:
199 jid = "aim.shakespeare.lit"
200 secret = "test"
201 server = "shakespeare.lit"
202 port = 5222
203 upload_service = "upload.test"
204 home_dir = Path(tempfile.mkdtemp())
205 user_jid_validator = ".*"
206 admins: list[str] = []
207 upload_requester = None
208 ignore_delay_threshold = 300
210 @classmethod
211 def setUpClass(cls) -> None:
212 for k, v in vars(cls.Config).items():
213 setattr(config, k.upper(), v)
214 if hasattr(cls, "plugin"):
215 subclasses = SubclassableOnce._SubclassableOnce__subclasses
216 subclasses.update(
217 {
218 BaseGateway: find_subclass(cls.plugin, BaseGateway),
219 BaseSession: find_subclass(cls.plugin, BaseSession),
220 LegacyRoster: find_subclass(cls.plugin, LegacyRoster, base_ok=True),
221 LegacyContact: find_subclass(
222 cls.plugin, LegacyContact, base_ok=True
223 ),
224 LegacyMUC: find_subclass(cls.plugin, LegacyMUC, base_ok=True),
225 LegacyBookmarks: find_subclass(
226 cls.plugin, LegacyBookmarks, base_ok=True
227 ),
228 }
229 )
230 for subclass in subclasses.values():
231 if subclass is not None:
232 subclass.__abstractmethods__ = set()
234 def setUp(self) -> None:
235 # workaround for duplicate output of sql alchemy's log, cf
236 # https://stackoverflow.com/a/76498428/5902284
237 from sqlalchemy import log as sqlalchemy_log
238 from sqlalchemy.pool import StaticPool
240 sqlalchemy_log._add_default_handler = lambda x: None
241 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:")
242 engine = self.db_engine = create_engine(
243 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None
244 )
245 Base.metadata.create_all(engine)
246 BaseGateway.store = SlidgeStore(engine)
247 BaseGateway._test_mode = True
249 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
251 self.xmpp.TEST_MODE = True
252 avatar_cache.store = self.xmpp.store.avatars
253 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
254 self.xmpp._always_send_everything = True
255 engine.echo = True
257 self.xmpp.connection_made(TestTransport(self.xmpp))
258 self.xmpp.session_bind_event.set()
259 # Remove unique ID prefix to make it easier to test
260 self.xmpp._id_prefix = ""
261 self.xmpp.default_lang = None
262 self.xmpp.peer_default_lang = None
264 def new_id():
265 self.xmpp._id += 1
266 return str(self.xmpp._id)
268 self.xmpp._id = 0
269 self.xmpp.new_id = new_id
271 # Must have the stream header ready for xmpp.process() to work.
272 header = self.xmpp.stream_header
274 self.xmpp.data_received(header)
275 self.wait_for_send_queue()
277 self.xmpp.socket.next_sent()
278 self.xmpp.socket.next_sent()
280 # Some plugins require messages to have ID values. Set
281 # this to True in tests related to those plugins.
282 self.xmpp.use_message_ids = False
283 self.xmpp.use_presence_ids = False
284 Error.namespace = "jabber:component:accept"
286 def _add_slidge_user(
287 self,
288 jid: str | JID = "romeo@shakespeare.lit",
289 legacy_module_data=None,
290 preferences=None,
291 ) -> None:
292 with self.xmpp.store.session() as orm:
293 user = GatewayUser(
294 jid=JID(jid),
295 legacy_module_data=legacy_module_data or {},
296 preferences=preferences
297 or {"sync_avatar": False, "sync_presence": False},
298 )
299 orm.add(user)
300 orm.commit()
301 self.run_coro(
302 self.xmpp._BaseGateway__dispatcher._on_user_register(Iq(sfrom=JID(jid)))
303 )
304 welcome = self.next_sent()
305 assert welcome["body"]
306 stanza = self.next_sent()
307 assert "logging in" in stanza["status"].lower(), stanza
308 stanza = self.next_sent()
309 assert "syncing contacts" in stanza["status"].lower(), stanza
310 if self.xmpp.GROUPS:
311 stanza = self.next_sent()
312 assert "syncing groups" in stanza["status"].lower(), stanza
313 probe = self.next_sent()
314 assert probe.get_type() == "probe"
315 stanza = self.next_sent()
316 assert stanza["status"].lower()
318 def user_session(self, jid: str | JID = "romeo@shakespeare.lit") -> BaseSession:
319 return BaseSession.get_self_or_unique_subclass().from_jid(JID(jid))
321 def get_joined_muc(
322 self,
323 legacy_id: str | int,
324 user_jid: str | JID = "romeo@shakespeare.lit",
325 resource: str = "gajim",
326 ) -> LegacyMUC:
327 muc: LegacyMUC = self.run_coro(
328 self.user_session(user_jid).bookmarks.by_legacy_id(legacy_id)
329 )
330 muc.add_user_resource(resource)
331 return muc
333 def tearDown(self) -> None:
334 self.db_engine.echo = False
335 super().tearDown()
336 Base.metadata.drop_all(self.xmpp.store._engine)
337 self.db_engine.dispose()
338 _sessions.clear()
340 def setup_logged_session(self, n_contacts: int = 0) -> None:
341 with self.xmpp.store.session() as orm:
342 user = GatewayUser(
343 jid=JID("romeo@montague.lit/gajim").bare,
344 legacy_module_data={"username": "romeo", "city": ""},
345 preferences={"sync_avatar": True, "sync_presence": True},
346 )
347 orm.add(user)
348 orm.commit()
350 with self.xmpp.store.session() as session:
351 session.execute(delete(Contact))
352 session.commit()
354 self.run_coro(
355 self.xmpp._BaseGateway__dispatcher._on_user_register(
356 Iq(sfrom="romeo@montague.lit/gajim")
357 )
358 )
359 welcome = self.next_sent()
360 assert welcome["body"], welcome
361 stanza = self.next_sent()
362 assert "logging in" in stanza["status"].lower(), stanza
363 stanza = self.next_sent()
364 assert "syncing contacts" in stanza["status"].lower(), stanza
365 if BaseGateway.get_self_or_unique_subclass().GROUPS:
366 stanza = self.next_sent()
367 assert "syncing groups" in stanza["status"].lower(), stanza
368 probe = self.next_sent()
369 assert probe.get_type() == "probe"
370 stanza = self.next_sent()
371 assert "yup" in stanza["status"].lower(), stanza
372 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
373 JID("romeo@montague.lit")
374 )
376 self.juliet: LegacyContact = self.run_coro(
377 self.romeo.contacts.by_legacy_id("juliet")
378 )
379 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
380 self.first_witch: LegacyParticipant = self.run_coro(
381 self.room.get_participant("firstwitch")
382 )
383 self.send( # language=XML
384 """
385 <iq type="get"
386 to="romeo@montague.lit"
387 id="1"
388 from="aim.shakespeare.lit">
389 <pubsub xmlns="http://jabber.org/protocol/pubsub">
390 <items node="urn:xmpp:avatar:metadata" />
391 </pubsub>
392 </iq>
393 """
394 )
396 @classmethod
397 def tearDownClass(cls) -> None:
398 reset_subclasses()
401def format_stanza(stanza):
402 return highlight(
403 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
404 )
407def find_subclass(o, parent, base_ok: bool = False):
408 try:
409 vals = vars(o).values()
410 except TypeError:
411 vals = o.values()
412 for x in vals:
413 try:
414 if issubclass(x, parent) and x is not parent:
415 return x
416 except TypeError:
417 pass
418 else:
419 if base_ok:
420 return parent
421 else:
422 raise RuntimeError
425def reset_subclasses() -> None:
426 """
427 Reset registered subclasses between test classes.
429 Needed because these classes are meant to only be subclassed once and raise
430 exceptions otherwise.
431 """
432 BaseSession.reset_subclass()
433 BaseGateway.reset_subclass()
434 LegacyRoster.reset_subclass()
435 LegacyContact.reset_subclass()
436 LegacyMUC.reset_subclass()
437 LegacyBookmarks.reset_subclass()
438 LegacyParticipant.reset_subclass()
439 # reset_commands()
442def reset_commands() -> None:
443 Command.subclasses = [
444 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
445 ]