Coverage for slidge/util/test.py: 89%
208 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1# type:ignore
2import tempfile
3import types
4from pathlib import Path
5from typing import Optional, Union
6from xml.dom.minidom import parseString
8try:
9 import xmldiff.main
11 XML_DIFF_PRESENT = True
12except ImportError:
13 xmldiff = None
14 XML_DIFF_PRESENT = False
16from slixmpp import (
17 JID,
18 ElementBase,
19 Iq,
20 MatcherId,
21 MatchXMLMask,
22 MatchXPath,
23 Message,
24 Presence,
25 StanzaPath,
26)
27from slixmpp.stanza.error import Error
28from slixmpp.test import SlixTest, TestTransport
29from slixmpp.xmlstream import highlight, tostring
30from slixmpp.xmlstream.matcher import MatchIDSender
31from sqlalchemy import create_engine, delete
33from slidge import (
34 BaseGateway,
35 BaseSession,
36 LegacyBookmarks,
37 LegacyContact,
38 LegacyMUC,
39 LegacyParticipant,
40 LegacyRoster,
41)
43from ..command import Command
44from ..core import config
45from ..core.session import _sessions
46from ..db import SlidgeStore
47from ..db.avatar import avatar_cache
48from ..db.meta import Base
49from ..db.models import Contact, GatewayUser
52class SlixTestPlus(SlixTest):
53 def setUp(self) -> None:
54 super().setUp()
55 Error.namespace = "jabber:component:accept"
57 def check(
58 self,
59 stanza,
60 criteria,
61 method: str = "exact",
62 defaults=None,
63 use_values: bool = True,
64 ):
65 """
66 Create and compare several stanza objects to a correct XML string.
68 If use_values is False, tests using stanza.values will not be used.
70 Some stanzas provide default values for some interfaces, but
71 these defaults can be problematic for testing since they can easily
72 be forgotten when supplying the XML string. A list of interfaces that
73 use defaults may be provided and the generated stanzas will use the
74 default values for those interfaces if needed.
76 However, correcting the supplied XML is not possible for interfaces
77 that add or remove XML elements. Only interfaces that map to XML
78 attributes may be set using the defaults parameter. The supplied XML
79 must take into account any extra elements that are included by default.
81 Arguments:
82 stanza -- The stanza object to test.
83 criteria -- An expression the stanza must match against.
84 method -- The type of matching to use; one of:
85 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
86 Defaults to the value of self.match_method.
87 defaults -- A list of stanza interfaces that have default
88 values. These interfaces will be set to their
89 defaults for the given and generated stanzas to
90 prevent unexpected test failures.
91 use_values -- Indicates if testing using stanza.values should
92 be used. Defaults to True.
93 """
94 if method is None and hasattr(self, "match_method"):
95 method = getattr(self, "match_method")
97 if method != "exact":
98 matchers = {
99 "stanzapath": StanzaPath,
100 "xpath": MatchXPath,
101 "mask": MatchXMLMask,
102 "idsender": MatchIDSender,
103 "id": MatcherId,
104 }
105 Matcher = matchers.get(method, None)
106 if Matcher is None:
107 raise ValueError("Unknown matching method.")
108 test = Matcher(criteria)
109 self.assertTrue(
110 test.match(stanza),
111 "Stanza did not match using %s method:\n" % method
112 + "Criteria:\n%s\n" % str(criteria)
113 + "Stanza:\n%s" % str(stanza),
114 )
115 else:
116 stanza_class = stanza.__class__
117 # Hack to preserve namespaces instead of having jabber:client
118 # everywhere.
119 old_ns = stanza_class.namespace
120 stanza_class.namespace = stanza.namespace
121 if not isinstance(criteria, ElementBase):
122 xml = self.parse_xml(criteria)
123 else:
124 xml = criteria.xml
126 # Ensure that top level namespaces are used, even if they
127 # were not provided.
128 self.fix_namespaces(stanza.xml)
129 self.fix_namespaces(xml)
131 stanza2 = stanza_class(xml=xml)
133 if use_values:
134 # Using stanza.values will add XML for any interface that
135 # has a default value. We need to set those defaults on
136 # the existing stanzas and XML so that they will compare
137 # correctly.
138 default_stanza = stanza_class()
139 if defaults is None:
140 known_defaults = {Message: ["type"], Presence: ["priority"]}
141 defaults = known_defaults.get(stanza_class, [])
142 for interface in defaults:
143 stanza[interface] = stanza[interface]
144 stanza2[interface] = stanza2[interface]
145 # Can really only automatically add defaults for top
146 # level attribute values. Anything else must be accounted
147 # for in the provided XML string.
148 if interface not in xml.attrib:
149 if interface in default_stanza.xml.attrib:
150 value = default_stanza.xml.attrib[interface]
151 xml.attrib[interface] = value
153 values = stanza2.values
154 stanza3 = stanza_class()
155 stanza3.values = values
157 debug = "Three methods for creating stanzas do not match.\n"
158 debug += "Given XML:\n%s\n" % highlight(tostring(xml))
159 debug += "Given stanza:\n%s\n" % format_stanza(stanza)
160 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
161 debug += "Second generated stanza:\n%s\n" % highlight(
162 tostring(stanza3.xml)
163 )
164 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
165 else:
166 debug = "Two methods for creating stanzas do not match.\n"
167 debug += "Given XML:\n%s\n" % highlight(tostring(xml))
168 debug += "Given stanza:\n%s\n" % format_stanza(stanza)
169 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
170 result = self.compare(xml, stanza.xml, stanza2.xml)
171 stanza_class.namespace = old_ns
173 if XML_DIFF_PRESENT and not result:
174 debug += str(
175 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
176 )
177 if use_values:
178 debug += str(
179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
180 )
181 self.assertTrue(result, debug)
183 def next_sent(
184 self, timeout: float = 0.05
185 ) -> Optional[Union[Message, Iq, Presence]]:
186 self.wait_for_send_queue()
187 sent = self.xmpp.socket.next_sent(timeout=timeout)
188 if sent is None:
189 return None
190 xml = self.parse_xml(sent)
191 self.fix_namespaces(xml, "jabber:component:accept")
192 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
193 return sent
196class SlidgeTest(SlixTestPlus):
197 plugin: Union[types.ModuleType, dict]
199 class Config:
200 jid = "aim.shakespeare.lit"
201 secret = "test"
202 server = "shakespeare.lit"
203 port = 5222
204 upload_service = "upload.test"
205 home_dir = Path(tempfile.mkdtemp())
206 user_jid_validator = ".*"
207 admins: list[str] = []
208 upload_requester = None
209 ignore_delay_threshold = 300
211 @classmethod
212 def setUpClass(cls) -> None:
213 for k, v in vars(cls.Config).items():
214 setattr(config, k.upper(), v)
216 def setUp(self):
217 if hasattr(self, "plugin"):
218 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway)
219 BaseSession._subclass = find_subclass(self.plugin, BaseSession)
220 LegacyRoster._subclass = find_subclass(
221 self.plugin, LegacyRoster, base_ok=True
222 )
223 LegacyContact._subclass = find_subclass(
224 self.plugin, LegacyContact, base_ok=True
225 )
226 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True)
227 LegacyBookmarks._subclass = find_subclass(
228 self.plugin, LegacyBookmarks, base_ok=True
229 )
231 # workaround for duplicate output of sql alchemy's log, cf
232 # https://stackoverflow.com/a/76498428/5902284
233 from sqlalchemy import log as sqlalchemy_log
235 sqlalchemy_log._add_default_handler = lambda x: None
237 engine = self.db_engine = create_engine("sqlite+pysqlite:///:memory:")
238 Base.metadata.create_all(engine)
239 BaseGateway.store = SlidgeStore(engine)
240 BaseGateway._test_mode = True
241 try:
242 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
243 except Exception:
244 raise
245 self.xmpp.TEST_MODE = True
246 avatar_cache.store = self.xmpp.store.avatars
247 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
248 self.xmpp._always_send_everything = True
249 engine.echo = True
251 self.xmpp.connection_made(TestTransport(self.xmpp))
252 self.xmpp.session_bind_event.set()
253 # Remove unique ID prefix to make it easier to test
254 self.xmpp._id_prefix = ""
255 self.xmpp.default_lang = None
256 self.xmpp.peer_default_lang = None
258 def new_id():
259 self.xmpp._id += 1
260 return str(self.xmpp._id)
262 self.xmpp._id = 0
263 self.xmpp.new_id = new_id
265 # Must have the stream header ready for xmpp.process() to work.
266 header = self.xmpp.stream_header
268 self.xmpp.data_received(header)
269 self.wait_for_send_queue()
271 self.xmpp.socket.next_sent()
272 self.xmpp.socket.next_sent()
274 # Some plugins require messages to have ID values. Set
275 # this to True in tests related to those plugins.
276 self.xmpp.use_message_ids = False
277 self.xmpp.use_presence_ids = False
278 Error.namespace = "jabber:component:accept"
280 def tearDown(self) -> None:
281 self.db_engine.echo = False
282 super().tearDown()
283 Base.metadata.drop_all(self.xmpp.store._engine)
284 _sessions.clear()
286 def setup_logged_session(self, n_contacts: int = 0) -> None:
287 with self.xmpp.store.session() as orm:
288 user = GatewayUser(
289 jid=JID("romeo@montague.lit/gajim").bare,
290 legacy_module_data={"username": "romeo", "city": ""},
291 preferences={"sync_avatar": True, "sync_presence": True},
292 )
293 orm.add(user)
294 orm.commit()
296 with self.xmpp.store.session() as session:
297 session.execute(delete(Contact))
298 session.commit()
300 self.run_coro(
301 self.xmpp._BaseGateway__dispatcher._on_user_register(
302 Iq(sfrom="romeo@montague.lit/gajim")
303 )
304 )
305 welcome = self.next_sent()
306 assert welcome["body"], welcome
307 stanza = self.next_sent()
308 assert "logging in" in stanza["status"].lower(), stanza
309 stanza = self.next_sent()
310 assert "syncing contacts" in stanza["status"].lower(), stanza
311 if BaseGateway.get_self_or_unique_subclass().GROUPS:
312 stanza = self.next_sent()
313 assert "syncing groups" in stanza["status"].lower(), stanza
314 probe = self.next_sent()
315 assert probe.get_type() == "probe"
316 stanza = self.next_sent()
317 assert "yup" in stanza["status"].lower(), stanza
318 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
319 JID("romeo@montague.lit")
320 )
322 self.juliet: LegacyContact = self.run_coro(
323 self.romeo.contacts.by_legacy_id("juliet")
324 )
325 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
326 self.first_witch: LegacyParticipant = self.run_coro(
327 self.room.get_participant("firstwitch")
328 )
329 self.send( # language=XML
330 """
331 <iq type="get"
332 to="romeo@montague.lit"
333 id="1"
334 from="aim.shakespeare.lit">
335 <pubsub xmlns="http://jabber.org/protocol/pubsub">
336 <items node="urn:xmpp:avatar:metadata" />
337 </pubsub>
338 </iq>
339 """
340 )
342 @classmethod
343 def tearDownClass(cls) -> None:
344 reset_subclasses()
347def format_stanza(stanza):
348 return highlight(
349 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
350 )
353def find_subclass(o, parent, base_ok: bool = False):
354 try:
355 vals = vars(o).values()
356 except TypeError:
357 vals = o.values()
358 for x in vals:
359 try:
360 if issubclass(x, parent) and x is not parent:
361 return x
362 except TypeError:
363 pass
364 else:
365 if base_ok:
366 return parent
367 else:
368 raise RuntimeError
371def reset_subclasses() -> None:
372 """
373 Reset registered subclasses between test classes.
375 Needed because these classes are meant to only be subclassed once and raise
376 exceptions otherwise.
377 """
378 BaseSession.reset_subclass()
379 BaseGateway.reset_subclass()
380 LegacyRoster.reset_subclass()
381 LegacyContact.reset_subclass()
382 LegacyMUC.reset_subclass()
383 LegacyBookmarks.reset_subclass()
384 LegacyParticipant.reset_subclass()
385 # reset_commands()
388def reset_commands() -> None:
389 Command.subclasses = [
390 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
391 ]