Coverage for slidge/util/test.py: 91%
211 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1# type:ignore
2import tempfile
3import types
4from pathlib import Path
5from typing import Optional, Union
6from xml.dom.minidom import parseString
8import xmldiff.main
9from slixmpp import (
10 JID,
11 ElementBase,
12 Iq,
13 MatcherId,
14 MatchXMLMask,
15 MatchXPath,
16 Message,
17 Presence,
18 StanzaPath,
19)
20from slixmpp.stanza.error import Error
21from slixmpp.test import SlixTest, TestTransport
22from slixmpp.xmlstream import highlight, tostring
23from slixmpp.xmlstream.matcher import MatchIDSender
24from sqlalchemy import create_engine, delete
26from slidge import (
27 BaseGateway,
28 BaseSession,
29 LegacyBookmarks,
30 LegacyContact,
31 LegacyMUC,
32 LegacyParticipant,
33 LegacyRoster,
34)
36from ..command import Command
37from ..core import config
38from ..core.config import _TimedeltaSeconds
39from ..core.pubsub import PepAvatar, PepNick
40from ..db import SlidgeStore
41from ..db.avatar import avatar_cache
42from ..db.meta import Base
43from ..db.models import Contact
46class SlixTestPlus(SlixTest):
47 def setUp(self):
48 super().setUp()
49 Error.namespace = "jabber:component:accept"
51 def check(self, stanza, criteria, method="exact", defaults=None, use_values=True):
52 """
53 Create and compare several stanza objects to a correct XML string.
55 If use_values is False, tests using stanza.values will not be used.
57 Some stanzas provide default values for some interfaces, but
58 these defaults can be problematic for testing since they can easily
59 be forgotten when supplying the XML string. A list of interfaces that
60 use defaults may be provided and the generated stanzas will use the
61 default values for those interfaces if needed.
63 However, correcting the supplied XML is not possible for interfaces
64 that add or remove XML elements. Only interfaces that map to XML
65 attributes may be set using the defaults parameter. The supplied XML
66 must take into account any extra elements that are included by default.
68 Arguments:
69 stanza -- The stanza object to test.
70 criteria -- An expression the stanza must match against.
71 method -- The type of matching to use; one of:
72 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
73 Defaults to the value of self.match_method.
74 defaults -- A list of stanza interfaces that have default
75 values. These interfaces will be set to their
76 defaults for the given and generated stanzas to
77 prevent unexpected test failures.
78 use_values -- Indicates if testing using stanza.values should
79 be used. Defaults to True.
80 """
81 if method is None and hasattr(self, "match_method"):
82 method = getattr(self, "match_method")
84 if method != "exact":
85 matchers = {
86 "stanzapath": StanzaPath,
87 "xpath": MatchXPath,
88 "mask": MatchXMLMask,
89 "idsender": MatchIDSender,
90 "id": MatcherId,
91 }
92 Matcher = matchers.get(method, None)
93 if Matcher is None:
94 raise ValueError("Unknown matching method.")
95 test = Matcher(criteria)
96 self.assertTrue(
97 test.match(stanza),
98 "Stanza did not match using %s method:\n" % method
99 + "Criteria:\n%s\n" % str(criteria)
100 + "Stanza:\n%s" % str(stanza),
101 )
102 else:
103 stanza_class = stanza.__class__
104 # Hack to preserve namespaces instead of having jabber:client
105 # everywhere.
106 old_ns = stanza_class.namespace
107 stanza_class.namespace = stanza.namespace
108 if not isinstance(criteria, ElementBase):
109 xml = self.parse_xml(criteria)
110 else:
111 xml = criteria.xml
113 # Ensure that top level namespaces are used, even if they
114 # were not provided.
115 self.fix_namespaces(stanza.xml)
116 self.fix_namespaces(xml)
118 stanza2 = stanza_class(xml=xml)
120 if use_values:
121 # Using stanza.values will add XML for any interface that
122 # has a default value. We need to set those defaults on
123 # the existing stanzas and XML so that they will compare
124 # correctly.
125 default_stanza = stanza_class()
126 if defaults is None:
127 known_defaults = {Message: ["type"], Presence: ["priority"]}
128 defaults = known_defaults.get(stanza_class, [])
129 for interface in defaults:
130 stanza[interface] = stanza[interface]
131 stanza2[interface] = stanza2[interface]
132 # Can really only automatically add defaults for top
133 # level attribute values. Anything else must be accounted
134 # for in the provided XML string.
135 if interface not in xml.attrib:
136 if interface in default_stanza.xml.attrib:
137 value = default_stanza.xml.attrib[interface]
138 xml.attrib[interface] = value
140 values = stanza2.values
141 stanza3 = stanza_class()
142 stanza3.values = values
144 debug = "Three methods for creating stanzas do not match.\n"
145 debug += "Given XML:\n%s\n" % highlight(tostring(xml))
146 debug += "Given stanza:\n%s\n" % format_stanza(stanza)
147 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
148 debug += "Second generated stanza:\n%s\n" % highlight(
149 tostring(stanza3.xml)
150 )
151 result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
152 else:
153 debug = "Two methods for creating stanzas do not match.\n"
154 debug += "Given XML:\n%s\n" % highlight(tostring(xml))
155 debug += "Given stanza:\n%s\n" % format_stanza(stanza)
156 debug += "Generated stanza:\n%s\n" % highlight(tostring(stanza2.xml))
157 result = self.compare(xml, stanza.xml, stanza2.xml)
158 stanza_class.namespace = old_ns
160 if not result:
161 debug += str(
162 xmldiff.main.diff_texts(tostring(xml), tostring(stanza.xml))
163 )
164 if use_values:
165 debug += str(
166 xmldiff.main.diff_texts(tostring(xml), tostring(stanza2.xml))
167 )
168 self.assertTrue(result, debug)
170 def next_sent(self, timeout=0.05) -> Optional[Union[Message, Iq, Presence]]:
171 self.wait_for_send_queue()
172 sent = self.xmpp.socket.next_sent(timeout=timeout)
173 if sent is None:
174 return None
175 xml = self.parse_xml(sent)
176 self.fix_namespaces(xml, "jabber:component:accept")
177 sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
178 return sent
181class SlidgeTest(SlixTestPlus):
182 plugin: Union[types.ModuleType, dict]
184 class Config:
185 jid = "aim.shakespeare.lit"
186 secret = "test"
187 server = "shakespeare.lit"
188 port = 5222
189 upload_service = "upload.test"
190 home_dir = Path(tempfile.mkdtemp())
191 user_jid_validator = ".*"
192 admins: list[str] = []
193 no_roster_push = False
194 upload_requester = None
195 ignore_delay_threshold = _TimedeltaSeconds("300")
196 last_seen_fallback = True
198 @classmethod
199 def setUpClass(cls):
200 for k, v in vars(cls.Config).items():
201 setattr(config, k.upper(), v)
203 def setUp(self):
204 if hasattr(self, "plugin"):
205 BaseGateway._subclass = find_subclass(self.plugin, BaseGateway)
206 BaseSession._subclass = find_subclass(self.plugin, BaseSession)
207 LegacyRoster._subclass = find_subclass(
208 self.plugin, LegacyRoster, base_ok=True
209 )
210 LegacyContact._subclass = find_subclass(
211 self.plugin, LegacyContact, base_ok=True
212 )
213 LegacyMUC._subclass = find_subclass(self.plugin, LegacyMUC, base_ok=True)
214 LegacyBookmarks._subclass = find_subclass(
215 self.plugin, LegacyBookmarks, base_ok=True
216 )
218 # workaround for duplicate output of sql alchemy's log, cf
219 # https://stackoverflow.com/a/76498428/5902284
220 from sqlalchemy import log as sqlalchemy_log
222 sqlalchemy_log._add_default_handler = lambda x: None
224 engine = self.db_engine = create_engine("sqlite+pysqlite:///:memory:")
225 Base.metadata.create_all(engine)
226 BaseGateway.store = SlidgeStore(engine)
227 BaseGateway._test_mode = True
228 try:
229 self.xmpp = BaseGateway.get_self_or_unique_subclass()()
230 except Exception:
231 raise
232 self.xmpp.TEST_MODE = True
233 PepNick.contact_store = self.xmpp.store.contacts
234 PepAvatar.store = self.xmpp.store
235 avatar_cache.store = self.xmpp.store.avatars
236 avatar_cache.set_dir(Path(tempfile.mkdtemp()))
237 self.xmpp._always_send_everything = True
238 engine.echo = True
240 self.xmpp.connection_made(TestTransport(self.xmpp))
241 self.xmpp.session_bind_event.set()
242 # Remove unique ID prefix to make it easier to test
243 self.xmpp._id_prefix = ""
244 self.xmpp.default_lang = None
245 self.xmpp.peer_default_lang = None
247 def new_id():
248 self.xmpp._id += 1
249 return str(self.xmpp._id)
251 self.xmpp._id = 0
252 self.xmpp.new_id = new_id
254 # Must have the stream header ready for xmpp.process() to work.
255 header = self.xmpp.stream_header
257 self.xmpp.data_received(header)
258 self.wait_for_send_queue()
260 self.xmpp.socket.next_sent()
261 self.xmpp.socket.next_sent()
263 # Some plugins require messages to have ID values. Set
264 # this to True in tests related to those plugins.
265 self.xmpp.use_message_ids = False
266 self.xmpp.use_presence_ids = False
267 Error.namespace = "jabber:component:accept"
269 def tearDown(self):
270 self.db_engine.echo = False
271 super().tearDown()
272 import slidge.db.store
274 if slidge.db.store._session is not None:
275 slidge.db.store._session.commit()
276 slidge.db.store._session = None
277 Base.metadata.drop_all(self.xmpp.store._engine)
279 def setup_logged_session(self, n_contacts=0):
280 user = self.xmpp.store.users.new(
281 JID("romeo@montague.lit/gajim"), {"username": "romeo", "city": ""}
282 )
283 user.preferences = {"sync_avatar": True, "sync_presence": True}
284 self.xmpp.store.users.update(user)
286 with self.xmpp.store.session() as session:
287 session.execute(delete(Contact))
288 session.commit()
290 self.run_coro(
291 self.xmpp._BaseGateway__dispatcher._on_user_register(
292 Iq(sfrom="romeo@montague.lit/gajim")
293 )
294 )
295 welcome = self.next_sent()
296 assert welcome["body"], welcome
297 stanza = self.next_sent()
298 assert "logging in" in stanza["status"].lower(), stanza
299 stanza = self.next_sent()
300 assert "syncing contacts" in stanza["status"].lower(), stanza
301 if BaseGateway.get_self_or_unique_subclass().GROUPS:
302 stanza = self.next_sent()
303 assert "syncing groups" in stanza["status"].lower(), stanza
304 for _ in range(n_contacts):
305 probe = self.next_sent()
306 assert probe.get_type() == "probe"
307 stanza = self.next_sent()
308 assert "yup" in stanza["status"].lower(), stanza
309 self.romeo: BaseSession = BaseSession.get_self_or_unique_subclass().from_jid(
310 JID("romeo@montague.lit")
311 )
313 self.juliet: LegacyContact = self.run_coro(
314 self.romeo.contacts.by_legacy_id("juliet")
315 )
316 self.room: LegacyMUC = self.run_coro(self.romeo.bookmarks.by_legacy_id("room"))
317 self.first_witch: LegacyParticipant = self.run_coro(
318 self.room.get_participant("firstwitch")
319 )
320 self.send( # language=XML
321 """
322 <iq type="get"
323 to="romeo@montague.lit"
324 id="1"
325 from="aim.shakespeare.lit">
326 <pubsub xmlns="http://jabber.org/protocol/pubsub">
327 <items node="urn:xmpp:avatar:metadata" />
328 </pubsub>
329 </iq>
330 """
331 )
333 @classmethod
334 def tearDownClass(cls):
335 reset_subclasses()
338def format_stanza(stanza):
339 return highlight(
340 "\n".join(parseString(tostring(stanza.xml)).toprettyxml().split("\n")[1:])
341 )
344def find_subclass(o, parent, base_ok=False):
345 try:
346 vals = vars(o).values()
347 except TypeError:
348 vals = o.values()
349 for x in vals:
350 try:
351 if issubclass(x, parent) and x is not parent:
352 return x
353 except TypeError:
354 pass
355 else:
356 if base_ok:
357 return parent
358 else:
359 raise RuntimeError
362def reset_subclasses():
363 """
364 Reset registered subclasses between test classes.
366 Needed because these classes are meant to only be subclassed once and raise
367 exceptions otherwise.
368 """
369 BaseSession.reset_subclass()
370 BaseGateway.reset_subclass()
371 LegacyRoster.reset_subclass()
372 LegacyContact.reset_subclass()
373 LegacyMUC.reset_subclass()
374 LegacyBookmarks.reset_subclass()
375 LegacyParticipant.reset_subclass()
376 # reset_commands()
379def reset_commands():
380 Command.subclasses = [
381 c for c in Command.subclasses if str(c).startswith("<class 'slidge.core")
382 ]