Coverage for slidge/core/dispatcher/search.py: 57%
56 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1from typing import TYPE_CHECKING
3from slixmpp import JID, CoroutineCallback, Iq, StanzaPath
4from slixmpp.exceptions import XMPPError
6from .util import DispatcherMixin, exceptions_to_xmpp_errors
8if TYPE_CHECKING:
9 from slidge.core.gateway import BaseGateway
12class SearchMixin(DispatcherMixin):
13 def __init__(self, xmpp: "BaseGateway"):
14 super().__init__(xmpp)
16 xmpp["xep_0055"].api.register(self.search_get_form, "search_get_form")
17 xmpp["xep_0055"].api.register(self._search_query, "search_query")
19 xmpp.plugin["xep_0030"].add_feature("jabber:iq:gateway")
20 xmpp.register_handler(
21 CoroutineCallback(
22 "iq:gateway",
23 StanzaPath("iq/gateway"),
24 self._handle_gateway_iq, # type: ignore
25 )
26 )
28 async def search_get_form(self, _gateway_jid, _node, ifrom: JID, iq: Iq):
29 """
30 Prepare the search form using :attr:`.BaseSession.SEARCH_FIELDS`
31 """
32 user = self.xmpp.store.users.get(ifrom)
33 if user is None:
34 raise XMPPError(text="Search is only allowed for registered users")
36 xmpp = self.xmpp
38 reply = iq.reply()
39 form = reply["search"]["form"]
40 form["title"] = xmpp.SEARCH_TITLE
41 form["instructions"] = xmpp.SEARCH_INSTRUCTIONS
42 for field in xmpp.SEARCH_FIELDS:
43 form.append(field.get_xml())
44 return reply
46 async def _search_query(self, _gateway_jid, _node, ifrom: JID, iq: Iq):
47 """
48 Handles a search request
49 """
50 session = await self._get_session(iq)
52 result = await session.on_search(iq["search"]["form"].get_values())
54 if not result:
55 raise XMPPError("item-not-found", text="Nothing was found")
57 reply = iq.reply()
58 form = reply["search"]["form"]
59 for field in result.fields:
60 form.add_reported(field.var, label=field.label, type=field.type)
61 for item in result.items:
62 form.add_item(item)
63 return reply
65 @exceptions_to_xmpp_errors
66 async def _handle_gateway_iq(self, iq: Iq):
67 if iq.get_to() != self.xmpp.boundjid.bare:
68 raise XMPPError("bad-request", "This can only be used on the component JID")
70 if len(self.xmpp.SEARCH_FIELDS) > 1:
71 raise XMPPError(
72 "feature-not-implemented", "Use jabber search for this gateway"
73 )
75 session = await self._get_session(iq)
76 field = self.xmpp.SEARCH_FIELDS[0]
78 reply = iq.reply()
79 if iq["type"] == "get":
80 reply["gateway"]["desc"] = self.xmpp.SEARCH_TITLE
81 reply["gateway"]["prompt"] = field.label
82 elif iq["type"] == "set":
83 prompt = iq["gateway"]["prompt"]
84 result = await session.on_search({field.var: prompt})
85 if result is None or not result.items:
86 raise XMPPError(
87 "item-not-found", "No contact was found with the info you provided."
88 )
89 if len(result.items) > 1:
90 raise XMPPError(
91 "bad-request", "Your search yielded more than one result."
92 )
93 reply["gateway"]["jid"] = result.items[0]["jid"]
95 reply.send()