Coverage for slidge / util / test.py: 91%
233 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1# type:ignore
2import os
3import tempfile
4import types
5from pathlib import Path
6from typing import ClassVar
7from xml.dom.minidom import parseString
9try:
10 import xmldiff.main
12 XML_DIFF_PRESENT = True
13except ImportError:
14 xmldiff = None
15 XML_DIFF_PRESENT = False
17from slixmpp import (
18 JID,
19 ElementBase,
20 Iq,
21 MatcherId,
22 MatchXMLMask,
23 MatchXPath,
24 Message,
25 Presence,
26 StanzaPath,
27)
28from slixmpp.stanza.error import Error
29from slixmpp.test import SlixTest, TestTransport
30from slixmpp.xmlstream import highlight, tostring
31from slixmpp.xmlstream.matcher import MatchIDSender
32from sqlalchemy import create_engine, delete
34from slidge import (
35 BaseGateway,
36 BaseSession,
37 LegacyBookmarks,
38 LegacyContact,
39 LegacyMUC,
40 LegacyParticipant,
41 LegacyRoster,
42)
44from ..command import Command
45from ..core import config
46from ..core.session import _sessions
47from ..db import SlidgeStore
48from ..db.avatar import avatar_cache
49from ..db.meta import Base
50from ..db.models import Contact, GatewayUser
51from ..util import SubclassableOnce
54class SlixTestPlus(SlixTest):
55 def setUp(self) -> None:
56 super().setUp()
57 Error.namespace = "jabber:component:accept"
59 def check(
60 self,
61 stanza, # noqa
62 criteria, # noqa
63 method: str = "exact",
64 defaults=None, # noqa
65 use_values: bool = True,
66 ) -> None:
67 """
68 Create and compare several stanza objects to a correct XML string.
70 If use_values is False, tests using stanza.values will not be used.
72 Some stanzas provide default values for some interfaces, but
73 these defaults can be problematic for testing since they can easily
74 be forgotten when supplying the XML string. A list of interfaces that
75 use defaults may be provided and the generated stanzas will use the
76 default values for those interfaces if needed.
78 However, correcting the supplied XML is not possible for interfaces
79 that add or remove XML elements. Only interfaces that map to XML
80 attributes may be set using the defaults parameter. The supplied XML
81 must take into account any extra elements that are included by default.
83 Arguments:
84 stanza -- The stanza object to test.
85 criteria -- An expression the stanza must match against.
86 method -- The type of matching to use; one of:
87 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
88 Defaults to the value of self.match_method.
89 defaults -- A list of stanza interfaces that have default
90 values. These interfaces will be set to their
91 defaults for the given and generated stanzas to
92 prevent unexpected test failures.
93 use_values -- Indicates if testing using stanza.values should
94 be used. Defaults to True.
95 """
96 if method is None and hasattr(self, "match_method"):
97 method = getattr(self, "match_method")
99 if method != "exact":
100 matchers = {
101 "stanzapath": StanzaPath,
102 "xpath": MatchXPath,
103 "mask": MatchXMLMask,
104 "idsender": MatchIDSender,
105 "id": MatcherId,
106 }
107 Matcher = matchers.get(method)
108 if Matcher is None:
109 raise ValueError("Unknown matching method.")
110 test = Matcher(criteria)
111 self.assertTrue(
112 test.match(stanza),
113 f"Stanza did not match using {method} method:\n"
114 + f"Criteria:\n{criteria!s}\n"
115 + f"Stanza:\n{stanza!s}",
116 )
117 else:
118 stanza_class = stanza.__class__
119 # Hack to preserve namespaces instead of having jabber:client
120 # everywhere.
121 old_ns = stanza_class.namespace
122 stanza_class.namespace = stanza.namespace
123 if not isinstance(criteria, ElementBase):
124 xml = self.parse_xml(criteria)
125 else:
126 xml = criteria.xml
128 # Ensure that top level namespaces are used, even if they
129 # were not provided.
130 self.fix_namespaces(stanza.xml)
131 self.fix_namespaces(xml)
133 stanza2 = stanza_class(xml=xml)
135 if use_values:
136 # Using stanza.values will add XML for any interface that
137 # has a default value. We need to set those defaults on
138 # the existing stanzas and XML so that they will compare
139 # correctly.
140 default_stanza = stanza_class()
141 if defaults is None:
142 known_defaults = {Message: ["type"], Presence: ["priority"]}
143 defaults = known_defaults.get(stanza_class, [])
144 for interface in defaults:
145 stanza[interface] = stanza[interface]
146 stanza2[interface] = stanza2[interface]
147 # Can really only automatically add defaults for top
148 # level attribute values. Anything else must be accounted
149 # for in the provided XML string.
150 if (
151 interface not in xml.attrib
152 and interface in default_stanza.xml.attrib
153 ):
154 value = default_stanza.xml.attrib[interface]
155 xml.attrib[interface] = value
157 values = stanza2.values
158 stanza3 = stanza_class()
159 stanza3.values = values
161 debug = "Three methods for creating stanzas do not match.\n"
162 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
163 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
164 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
165 debug += (
166 f"Second generated stanza:\n{highlight(tostring(stanza3.xml))}\n"
167 )
168 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
169 else:
170 debug = "Two methods for creating stanzas do not match.\n"
171 debug += f"Given XML:\n{highlight(tostring(xml))}\n"
172 debug += f"Given stanza:\n{format_stanza(stanza)}\n"
173 debug += f"Generated stanza:\n{highlight(tostring(stanza2.xml))}\n"
174 result = self.compare(xml, stanza.xml, stanza2.xml)
175 stanza_class.namespace = old_ns
177 if XML_DIFF_PRESENT and not result:
178 debug += str(
179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
180 )
181 if use_values:
182 debug += str(
183 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
184 )
185 self.assertTrue(result, debug)
187 def next_sent(self, timeout: float = 0.05) -> Message | Iq | Presence | None:
188 self.wait_for_send_queue()
189 sent = self.xmpp.socket.next_sent(timeout=timeout)
190 if sent is None:
191 return None
192 xml = self.parse_xml(sent)
193 self.fix_namespaces(xml, "jabber:component:accept")
194 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
195 return sent
198class SlidgeTest(SlixTestPlus):
199 plugin: types.ModuleType | dict
201 class Config:
202 jid = "aim.shakespeare.lit"
203 secret = "test"
204 server = "shakespeare.lit"
205 port = 5222
206 upload_service = "upload.test"
207 home_dir = Path(tempfile.mkdtemp())
208 user_jid_validator = ".*"
209 admins: ClassVar[list[str]] = []
210 upload_requester = None
211 ignore_delay_threshold = 300
213 @classmethod
214 def setUpClass(cls) -> None:
215 for k, v in vars(cls.Config).items():
216 setattr(config, k.upper(), v)
217 if hasattr(cls, "plugin"):
218 subclasses = SubclassableOnce._SubclassableOnce__subclasses
219 subclasses.update(
220 {
221 BaseGateway: find_subclass(cls.plugin, BaseGateway),
222 BaseSession: find_subclass(cls.plugin, BaseSession),
223 LegacyRoster: find_subclass(cls.plugin, LegacyRoster, base_ok=True),
224 LegacyContact: find_subclass(
225 cls.plugin, LegacyContact, base_ok=True
226 ),
227 LegacyMUC: find_subclass(cls.plugin, LegacyMUC, base_ok=True),
228 LegacyBookmarks: find_subclass(
229 cls.plugin, LegacyBookmarks, base_ok=True
230 ),
231 }
232 )
233 for subclass in subclasses.values():
234 if subclass is not None:
235 subclass.__abstractmethods__ = set()
237 def setUp(self) -> None:
238 # workaround for duplicate output of sql alchemy's log, cf
239 # https://stackoverflow.com/a/76498428/5902284
240 from sqlalchemy import log as sqlalchemy_log
241 from sqlalchemy.pool import StaticPool
243 sqlalchemy_log._add_default_handler = lambda x: None
244 db_url = os.getenv("SLIDGETEST_DB_URL", "sqlite+pysqlite:///:memory:")
245 engine = self.db_engine = create_engine(
246 db_url, poolclass=StaticPool if db_url.startswith("postgresql+") else None
247 )
248 Base.metadata.create_all(engine)
249 BaseGateway.store = SlidgeStore(engine)
250 BaseGateway._test_mode = True
252 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
254 self.xmpp.TEST_MODE = True
255 avatar_cache.store = self.xmpp.store.avatars
256 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
257 self.xmpp._always_send_everything = True
258 engine.echo = True
260 self.xmpp.connection_made(TestTransport(self.xmpp))
261 self.xmpp.session_bind_event.set()
262 # Remove unique ID prefix to make it easier to test
263 self.xmpp._id_prefix = ""
264 self.xmpp.default_lang = None
265 self.xmpp.peer_default_lang = None
267 def new_id() -> str:
268 self.xmpp._id += 1
269 return str(self.xmpp._id)
271 self.xmpp._id = 0
272 self.xmpp.new_id = new_id
274 # Must have the stream header ready for xmpp.process() to work.
275 header = self.xmpp.stream_header
277 self.xmpp.data_received(header)
278 self.wait_for_send_queue()
280 self.xmpp.socket.next_sent()
281 self.xmpp.socket.next_sent()
283 # Some plugins require messages to have ID values. Set
284 # this to True in tests related to those plugins.
285 self.xmpp.use_message_ids = False
286 self.xmpp.use_presence_ids = False
287 Error.namespace = "jabber:component:accept"
289 def _add_slidge_user(
290 self,
291 jid: str | JID = "romeo@shakespeare.lit",
292 legacy_module_data: dict | None = None,
293 preferences: dict | None = None,
294 ) -> None:
295 with self.xmpp.store.session() as orm:
296 user = GatewayUser(
297 jid=JID(jid),
298 legacy_module_data=legacy_module_data or {},
299 preferences=preferences
300 or {"sync_avatar": False, "sync_presence": False},
301 )
302 orm.add(user)
303 orm.commit()
304 self.run_coro(
305 self.xmpp._BaseGateway__dispatcher._on_user_register(Iq(sfrom=JID(jid)))
306 )
307 welcome = self.next_sent()
308 assert welcome["body"]
309 stanza = self.next_sent()
310 assert "logging in" in stanza["status"].lower(), stanza
311 stanza = self.next_sent()
312 assert "syncing contacts" in stanza["status"].lower(), stanza
313 if self.xmpp.GROUPS:
314 stanza = self.next_sent()
315 assert "syncing groups" in stanza["status"].lower(), stanza
316 probe = self.next_sent()
317 assert probe.get_type() == "probe"
318 stanza = self.next_sent()
319 assert stanza["status"].lower()
321 def user_session(self, jid: str | JID = "romeo@shakespeare.lit") -> BaseSession:
322 return BaseSession.get_self_or_unique_subclass().from_jid(JID(jid))
324 def get_joined_muc(
325 self,
326 legacy_id: str | int,
327 user_jid: str | JID = "romeo@shakespeare.lit",
328 resource: str = "gajim",
329 ) -> LegacyMUC:
330 muc: LegacyMUC = self.run_coro(
331 self.user_session(user_jid).bookmarks.by_legacy_id(legacy_id)
332 )
333 muc.add_user_resource(resource)
334 return muc
336 def tearDown(self) -> None:
337 self.db_engine.echo = False
338 super().tearDown()
339 Base.metadata.drop_all(self.xmpp.store._engine)
340 self.db_engine.dispose()
341 _sessions.clear()
343 def setup_logged_session(self, n_contacts: int = 0) -> None:
344 with self.xmpp.store.session() as orm:
345 user = GatewayUser(
346 jid=JID("romeo@montague.lit/gajim").bare,
347 legacy_module_data={"username": "romeo", "city": ""},
348 preferences={"sync_avatar": True, "sync_presence": True},
349 )
350 orm.add(user)
351 orm.commit()
353 with self.xmpp.store.session() as session:
354 session.execute(delete(Contact))
355 session.commit()
357 self.run_coro(
358 self.xmpp._BaseGateway__dispatcher._on_user_register(
359 Iq(sfrom="romeo@montague.lit/gajim")
360 )
361 )
362 welcome = self.next_sent()
363 assert welcome["body"], welcome
364 stanza = self.next_sent()
365 assert "logging in" in stanza["status"].lower(), stanza
366 stanza = self.next_sent()
367 assert "syncing contacts" in stanza["status"].lower(), stanza
368 if BaseGateway.get_self_or_unique_subclass().GROUPS:
369 stanza = self.next_sent()
370 assert "syncing groups" in stanza["status"].lower(), stanza
371 probe = self.next_sent()
372 assert probe.get_type() == "probe"
373 stanza = self.next_sent()
374 assert "yup" in stanza["status"].lower(), stanza
375 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
376 JID("romeo@montague.lit")
377 )
379 self.juliet: LegacyContact = self.run_coro(
380 self.romeo.contacts.by_legacy_id("juliet")
381 )
382 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
383 self.first_witch: LegacyParticipant = self.run_coro(
384 self.room.get_participant("firstwitch")
385 )
386 self.send( # language=XML
387 """
388 <iq type="get"
389 to="romeo@montague.lit"
390 id="1"
391 from="aim.shakespeare.lit">
392 <pubsub xmlns="http://jabber.org/protocol/pubsub">
393 <items node="urn:xmpp:avatar:metadata" />
394 </pubsub>
395 </iq>
396 """
397 )
399 @classmethod
400 def tearDownClass(cls) -> None:
401 reset_subclasses()
404def format_stanza(stanza: ElementBase) -> str:
405 return highlight(
406 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
407 )
410def find_subclass(o, parent, base_ok: bool = False): # noqa
411 try:
412 vals = vars(o).values()
413 except TypeError:
414 vals = o.values()
415 for x in vals:
416 try:
417 if issubclass(x, parent) and x is not parent:
418 return x
419 except TypeError:
420 pass
421 else:
422 if base_ok:
423 return parent
424 else:
425 raise RuntimeError
428def reset_subclasses() -> None:
429 """
430 Reset registered subclasses between test classes.
432 Needed because these classes are meant to only be subclassed once and raise
433 exceptions otherwise.
434 """
435 BaseSession.reset_subclass()
436 BaseGateway.reset_subclass()
437 LegacyRoster.reset_subclass()
438 LegacyContact.reset_subclass()
439 LegacyMUC.reset_subclass()
440 LegacyBookmarks.reset_subclass()
441 LegacyParticipant.reset_subclass()
442 # reset_commands()
445def reset_commands() -> None:
446 Command.subclasses = [
447 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
448 ]