Coverage for slidge/util/test.py: 89%
207 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1# type:ignore
2import tempfile
3import types
4from pathlib import Path
5from typing import Optional, Union
6from xml.dom.minidom import parseString
8try:
9 import xmldiff.main
11 XML_DIFF_PRESENT = True
12except ImportError:
13 xmldiff = None
14 XML_DIFF_PRESENT = False
16from slixmpp import (
17 JID,
18 ElementBase,
19 Iq,
20 MatcherId,
21 MatchXMLMask,
22 MatchXPath,
23 Message,
24 Presence,
25 StanzaPath,
26)
27from slixmpp.stanza.error import Error
28from slixmpp.test import SlixTest, TestTransport
29from slixmpp.xmlstream import highlight, tostring
30from slixmpp.xmlstream.matcher import MatchIDSender
31from sqlalchemy import create_engine, delete
33from slidge import (
34 BaseGateway,
35 BaseSession,
36 LegacyBookmarks,
37 LegacyContact,
38 LegacyMUC,
39 LegacyParticipant,
40 LegacyRoster,
41)
43from ..command import Command
44from ..core import config
45from ..core.config import _TimedeltaSeconds
46from ..db import SlidgeStore
47from ..db.avatar import avatar_cache
48from ..db.meta import Base
49from ..db.models import Contact, GatewayUser
52class SlixTestPlus(SlixTest):
53 def setUp(self) -> None:
54 super().setUp()
55 Error.namespace = "jabber:component:accept"
57 def check(
58 self,
59 stanza,
60 criteria,
61 method: str = "exact",
62 defaults=None,
63 use_values: bool = True,
64 ):
65 """
66 Create and compare several stanza objects to a correct XML string.
68 If use_values is False, tests using stanza.values will not be used.
70 Some stanzas provide default values for some interfaces, but
71 these defaults can be problematic for testing since they can easily
72 be forgotten when supplying the XML string. A list of interfaces that
73 use defaults may be provided and the generated stanzas will use the
74 default values for those interfaces if needed.
76 However, correcting the supplied XML is not possible for interfaces
77 that add or remove XML elements. Only interfaces that map to XML
78 attributes may be set using the defaults parameter. The supplied XML
79 must take into account any extra elements that are included by default.
81 Arguments:
82 stanza -- The stanza object to test.
83 criteria -- An expression the stanza must match against.
84 method -- The type of matching to use; one of:
85 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
86 Defaults to the value of self.match_method.
87 defaults -- A list of stanza interfaces that have default
88 values. These interfaces will be set to their
89 defaults for the given and generated stanzas to
90 prevent unexpected test failures.
91 use_values -- Indicates if testing using stanza.values should
92 be used. Defaults to True.
93 """
94 if method is None and hasattr(self, "match_method"):
95 method = getattr(self, "match_method")
97 if method != "exact":
98 matchers = {
99 "stanzapath": StanzaPath,
100 "xpath": MatchXPath,
101 "mask": MatchXMLMask,
102 "idsender": MatchIDSender,
103 "id": MatcherId,
104 }
105 Matcher = matchers.get(method, None)
106 if Matcher is None:
107 raise ValueError("Unknown matching method.")
108 test = Matcher(criteria)
109 self.assertTrue(
110 test.match(stanza),
111 "Stanza did not match using %s method:\n" % method
112 + "Criteria:\n%s\n" % str(criteria)
113 + "Stanza:\n%s" % str(stanza),
114 )
115 else:
116 stanza_class = stanza.__class__
117 # Hack to preserve namespaces instead of having jabber:client
118 # everywhere.
119 old_ns = stanza_class.namespace
120 stanza_class.namespace = stanza.namespace
121 if not isinstance(criteria, ElementBase):
122 xml = self.parse_xml(criteria)
123 else:
124 xml = criteria.xml
126 # Ensure that top level namespaces are used, even if they
127 # were not provided.
128 self.fix_namespaces(stanza.xml)
129 self.fix_namespaces(xml)
131 stanza2 = stanza_class(xml=xml)
133 if use_values:
134 # Using stanza.values will add XML for any interface that
135 # has a default value. We need to set those defaults on
136 # the existing stanzas and XML so that they will compare
137 # correctly.
138 default_stanza = stanza_class()
139 if defaults is None:
140 known_defaults = {Message: ["type"], Presence: ["priority"]}
141 defaults = known_defaults.get(stanza_class, [])
142 for interface in defaults:
143 stanza[interface] = stanza[interface]
144 stanza2[interface] = stanza2[interface]
145 # Can really only automatically add defaults for top
146 # level attribute values. Anything else must be accounted
147 # for in the provided XML string.
148 if interface not in xml.attrib:
149 if interface in default_stanza.xml.attrib:
150 value = default_stanza.xml.attrib[interface]
151 xml.attrib[interface] = value
153 values = stanza2.values
154 stanza3 = stanza_class()
155 stanza3.values = values
157 debug = "Three methods for creating stanzas do not match.\n"
158 debug += "Given XML:\n%s\n" % highlight(tostring(xml))
159 debug += "Given stanza:\n%s\n" % format_stanza(stanza)
160 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
161 debug += "Second generated stanza:\n%s\n" % highlight(
162 tostring(stanza3.xml)
163 )
164 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
165 else:
166 debug = "Two methods for creating stanzas do not match.\n"
167 debug += "Given XML:\n%s\n" % highlight(tostring(xml))
168 debug += "Given stanza:\n%s\n" % format_stanza(stanza)
169 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
170 result = self.compare(xml, stanza.xml, stanza2.xml)
171 stanza_class.namespace = old_ns
173 if XML_DIFF_PRESENT and not result:
174 debug += str(
175 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
176 )
177 if use_values:
178 debug += str(
179 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
180 )
181 self.assertTrue(result, debug)
183 def next_sent(
184 self, timeout: float = 0.05
185 ) -> Optional[Union[Message, Iq, Presence]]:
186 self.wait_for_send_queue()
187 sent = self.xmpp.socket.next_sent(timeout=timeout)
188 if sent is None:
189 return None
190 xml = self.parse_xml(sent)
191 self.fix_namespaces(xml, "jabber:component:accept")
192 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
193 return sent
196class SlidgeTest(SlixTestPlus):
197 plugin: Union[types.ModuleType, dict]
199 class Config:
200 jid = "aim.shakespeare.lit"
201 secret = "test"
202 server = "shakespeare.lit"
203 port = 5222
204 upload_service = "upload.test"
205 home_dir = Path(tempfile.mkdtemp())
206 user_jid_validator = ".*"
207 admins: list[str] = []
208 upload_requester = None
209 ignore_delay_threshold = _TimedeltaSeconds("300")
211 @classmethod
212 def setUpClass(cls) -> None:
213 for k, v in vars(cls.Config).items():
214 setattr(config, k.upper(), v)
216 def setUp(self):
217 if hasattr(self, "plugin"):
218 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway)
219 BaseSession._subclass = find_subclass(self.plugin, BaseSession)
220 LegacyRoster._subclass = find_subclass(
221 self.plugin, LegacyRoster, base_ok=True
222 )
223 LegacyContact._subclass = find_subclass(
224 self.plugin, LegacyContact, base_ok=True
225 )
226 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True)
227 LegacyBookmarks._subclass = find_subclass(
228 self.plugin, LegacyBookmarks, base_ok=True
229 )
231 # workaround for duplicate output of sql alchemy's log, cf
232 # https://stackoverflow.com/a/76498428/5902284
233 from sqlalchemy import log as sqlalchemy_log
235 sqlalchemy_log._add_default_handler = lambda x: None
237 engine = self.db_engine = create_engine("sqlite+pysqlite:///:memory:")
238 Base.metadata.create_all(engine)
239 BaseGateway.store = SlidgeStore(engine)
240 BaseGateway._test_mode = True
241 try:
242 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
243 except Exception:
244 raise
245 self.xmpp.TEST_MODE = True
246 avatar_cache.store = self.xmpp.store.avatars
247 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
248 self.xmpp._always_send_everything = True
249 engine.echo = True
251 self.xmpp.connection_made(TestTransport(self.xmpp))
252 self.xmpp.session_bind_event.set()
253 # Remove unique ID prefix to make it easier to test
254 self.xmpp._id_prefix = ""
255 self.xmpp.default_lang = None
256 self.xmpp.peer_default_lang = None
258 def new_id():
259 self.xmpp._id += 1
260 return str(self.xmpp._id)
262 self.xmpp._id = 0
263 self.xmpp.new_id = new_id
265 # Must have the stream header ready for xmpp.process() to work.
266 header = self.xmpp.stream_header
268 self.xmpp.data_received(header)
269 self.wait_for_send_queue()
271 self.xmpp.socket.next_sent()
272 self.xmpp.socket.next_sent()
274 # Some plugins require messages to have ID values. Set
275 # this to True in tests related to those plugins.
276 self.xmpp.use_message_ids = False
277 self.xmpp.use_presence_ids = False
278 Error.namespace = "jabber:component:accept"
280 def tearDown(self) -> None:
281 self.db_engine.echo = False
282 super().tearDown()
283 Base.metadata.drop_all(self.xmpp.store._engine)
285 def setup_logged_session(self, n_contacts: int = 0) -> None:
286 with self.xmpp.store.session() as orm:
287 user = GatewayUser(
288 jid=JID("romeo@montague.lit/gajim").bare,
289 legacy_module_data={"username": "romeo", "city": ""},
290 preferences={"sync_avatar": True, "sync_presence": True},
291 )
292 orm.add(user)
293 orm.commit()
295 with self.xmpp.store.session() as session:
296 session.execute(delete(Contact))
297 session.commit()
299 self.run_coro(
300 self.xmpp._BaseGateway__dispatcher._on_user_register(
301 Iq(sfrom="romeo@montague.lit/gajim")
302 )
303 )
304 welcome = self.next_sent()
305 assert welcome["body"], welcome
306 stanza = self.next_sent()
307 assert "logging in" in stanza["status"].lower(), stanza
308 stanza = self.next_sent()
309 assert "syncing contacts" in stanza["status"].lower(), stanza
310 if BaseGateway.get_self_or_unique_subclass().GROUPS:
311 stanza = self.next_sent()
312 assert "syncing groups" in stanza["status"].lower(), stanza
313 probe = self.next_sent()
314 assert probe.get_type() == "probe"
315 stanza = self.next_sent()
316 assert "yup" in stanza["status"].lower(), stanza
317 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
318 JID("romeo@montague.lit")
319 )
321 self.juliet: LegacyContact = self.run_coro(
322 self.romeo.contacts.by_legacy_id("juliet")
323 )
324 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
325 self.first_witch: LegacyParticipant = self.run_coro(
326 self.room.get_participant("firstwitch")
327 )
328 self.send( # language=XML
329 """
330 <iq type="get"
331 to="romeo@montague.lit"
332 id="1"
333 from="aim.shakespeare.lit">
334 <pubsub xmlns="http://jabber.org/protocol/pubsub">
335 <items node="urn:xmpp:avatar:metadata" />
336 </pubsub>
337 </iq>
338 """
339 )
341 @classmethod
342 def tearDownClass(cls) -> None:
343 reset_subclasses()
346def format_stanza(stanza):
347 return highlight(
348 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
349 )
352def find_subclass(o, parent, base_ok: bool = False):
353 try:
354 vals = vars(o).values()
355 except TypeError:
356 vals = o.values()
357 for x in vals:
358 try:
359 if issubclass(x, parent) and x is not parent:
360 return x
361 except TypeError:
362 pass
363 else:
364 if base_ok:
365 return parent
366 else:
367 raise RuntimeError
370def reset_subclasses() -> None:
371 """
372 Reset registered subclasses between test classes.
374 Needed because these classes are meant to only be subclassed once and raise
375 exceptions otherwise.
376 """
377 BaseSession.reset_subclass()
378 BaseGateway.reset_subclass()
379 LegacyRoster.reset_subclass()
380 LegacyContact.reset_subclass()
381 LegacyMUC.reset_subclass()
382 LegacyBookmarks.reset_subclass()
383 LegacyParticipant.reset_subclass()
384 # reset_commands()
387def reset_commands() -> None:
388 Command.subclasses = [
389 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
390 ]