Coverage for slidge/core/dispatcher/search.py: 58%
59 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1from typing import TYPE_CHECKING
3from slixmpp import JID, CoroutineCallback, Iq, StanzaPath
4from slixmpp.exceptions import XMPPError
6from ...db.models import GatewayUser
7from .util import DispatcherMixin, exceptions_to_xmpp_errors
9if TYPE_CHECKING:
10 from slidge.core.gateway import BaseGateway
13class SearchMixin(DispatcherMixin):
14 __slots__: list[str] = []
16 def __init__(self, xmpp: "BaseGateway") -> None:
17 super().__init__(xmpp)
19 xmpp["xep_0055"].api.register(self.search_get_form, "search_get_form")
20 xmpp["xep_0055"].api.register(self._search_query, "search_query")
22 xmpp.plugin["xep_0030"].add_feature("jabber:iq:gateway")
23 xmpp.register_handler(
24 CoroutineCallback(
25 "iq:gateway",
26 StanzaPath("iq/gateway"),
27 self._handle_gateway_iq,
28 )
29 )
31 async def search_get_form(self, _gateway_jid, _node, ifrom: JID, iq: Iq):
32 """
33 Prepare the search form using :attr:`.BaseSession.SEARCH_FIELDS`
34 """
35 with self.xmpp.store.session() as orm:
36 user = orm.query(GatewayUser).one_or_none()
37 if user is None:
38 raise XMPPError(text="Search is only allowed for registered users")
40 xmpp = self.xmpp
42 reply = iq.reply()
43 form = reply["search"]["form"]
44 form["title"] = xmpp.SEARCH_TITLE
45 form["instructions"] = xmpp.SEARCH_INSTRUCTIONS
46 for field in xmpp.SEARCH_FIELDS:
47 form.append(field.get_xml())
48 return reply
50 async def _search_query(self, _gateway_jid, _node, ifrom: JID, iq: Iq):
51 """
52 Handles a search request
53 """
54 session = await self._get_session(iq)
56 result = await session.on_search(iq["search"]["form"].get_values())
58 if not result:
59 raise XMPPError("item-not-found", text="Nothing was found")
61 reply = iq.reply()
62 form = reply["search"]["form"]
63 for field in result.fields:
64 form.add_reported(field.var, label=field.label, type=field.type)
65 for item in result.items:
66 form.add_item(item)
67 return reply
69 @exceptions_to_xmpp_errors
70 async def _handle_gateway_iq(self, iq: Iq):
71 if iq.get_to() != self.xmpp.boundjid.bare:
72 raise XMPPError("bad-request", "This can only be used on the component JID")
74 if len(self.xmpp.SEARCH_FIELDS) > 1:
75 raise XMPPError(
76 "feature-not-implemented", "Use jabber search for this gateway"
77 )
79 session = await self._get_session(iq)
80 field = self.xmpp.SEARCH_FIELDS[0]
82 reply = iq.reply()
83 if iq["type"] == "get":
84 reply["gateway"]["desc"] = self.xmpp.SEARCH_TITLE
85 reply["gateway"]["prompt"] = field.label
86 elif iq["type"] == "set":
87 prompt = iq["gateway"]["prompt"]
88 result = await session.on_search({field.var: prompt})
89 if result is None or not result.items:
90 raise XMPPError(
91 "item-not-found", "No contact was found with the info you provided."
92 )
93 if len(result.items) > 1:
94 raise XMPPError(
95 "bad-request", "Your search yielded more than one result."
96 )
97 reply["gateway"]["jid"] = result.items[0]["jid"]
99 reply.send()