Coverage for slidge / core / dispatcher / search.py: 60%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1from typing import TYPE_CHECKING
3from slixmpp import JID, CoroutineCallback, Iq, StanzaPath
4from slixmpp.exceptions import XMPPError
6from .util import DispatcherMixin, exceptions_to_xmpp_errors
8if TYPE_CHECKING:
9 from slidge.core.gateway import BaseGateway
12class SearchMixin(DispatcherMixin):
13 __slots__: list[str] = []
15 def __init__(self, xmpp: "BaseGateway") -> None:
16 super().__init__(xmpp)
18 xmpp["xep_0055"].api.register(self.search_get_form, "search_get_form")
19 xmpp["xep_0055"].api.register(self._search_query, "search_query")
21 xmpp.plugin["xep_0030"].add_feature("jabber:iq:gateway")
22 xmpp.register_handler(
23 CoroutineCallback(
24 "iq:gateway",
25 StanzaPath("iq/gateway"),
26 self._handle_gateway_iq,
27 )
28 )
30 async def search_get_form(
31 self,
32 _gateway_jid: JID,
33 _node: str,
34 ifrom: JID,
35 iq: Iq,
36 ) -> Iq:
37 """
38 Prepare the search form using :attr:`.BaseSession.SEARCH_FIELDS`
39 """
40 await self._get_session(iq)
42 xmpp = self.xmpp
44 reply = iq.reply()
45 form = reply["search"]["form"]
46 form["title"] = xmpp.SEARCH_TITLE
47 form["instructions"] = xmpp.SEARCH_INSTRUCTIONS
48 for field in xmpp.SEARCH_FIELDS:
49 form.append(field.get_xml())
50 return reply # type:ignore[no-any-return]
52 async def _search_query(
53 self,
54 _gateway_jid: JID,
55 _node: str,
56 ifrom: JID,
57 iq: Iq,
58 ) -> Iq:
59 """
60 Handles a search request
61 """
62 session = await self._get_session(iq)
64 result = await session.on_search(iq["search"]["form"].get_values())
66 if not result:
67 raise XMPPError("item-not-found", text="Nothing was found")
69 reply = iq.reply()
70 form = reply["search"]["form"]
71 for field in result.fields:
72 form.add_reported(field.var, label=field.label, type=field.type)
73 for item in result.items:
74 form.add_item(item)
75 return reply # type:ignore[no-any-return]
77 @exceptions_to_xmpp_errors
78 async def _handle_gateway_iq(self, iq: Iq) -> None:
79 if iq.get_to() != self.xmpp.boundjid.bare:
80 raise XMPPError("bad-request", "This can only be used on the component JID")
82 if len(self.xmpp.SEARCH_FIELDS) > 1:
83 raise XMPPError(
84 "feature-not-implemented", "Use jabber search for this gateway"
85 )
87 session = await self._get_session(iq)
88 field = self.xmpp.SEARCH_FIELDS[0]
90 reply = iq.reply()
91 if iq["type"] == "get":
92 reply["gateway"]["desc"] = self.xmpp.SEARCH_TITLE
93 reply["gateway"]["prompt"] = field.label
94 elif iq["type"] == "set":
95 prompt = iq["gateway"]["prompt"]
96 result = await session.on_search({field.var: prompt})
97 if result is None or not result.items:
98 raise XMPPError(
99 "item-not-found", "No contact was found with the info you provided."
100 )
101 if len(result.items) > 1:
102 raise XMPPError(
103 "bad-request", "Your search yielded more than one result."
104 )
105 reply["gateway"]["jid"] = result.items[0]["jid"]
107 reply.send()