Coverage for slidge / core / dispatcher / search.py: 60%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1from typing import TYPE_CHECKING 

2 

3from slixmpp import JID, CoroutineCallback, Iq, StanzaPath 

4from slixmpp.exceptions import XMPPError 

5 

6from .util import DispatcherMixin, exceptions_to_xmpp_errors 

7 

8if TYPE_CHECKING: 

9 from slidge.core.gateway import BaseGateway 

10 

11 

12class SearchMixin(DispatcherMixin): 

13 __slots__: list[str] = [] 

14 

15 def __init__(self, xmpp: "BaseGateway") -> None: 

16 super().__init__(xmpp) 

17 

18 xmpp["xep_0055"].api.register(self.search_get_form, "search_get_form") 

19 xmpp["xep_0055"].api.register(self._search_query, "search_query") 

20 

21 xmpp.plugin["xep_0030"].add_feature("jabber:iq:gateway") 

22 xmpp.register_handler( 

23 CoroutineCallback( 

24 "iq:gateway", 

25 StanzaPath("iq/gateway"), 

26 self._handle_gateway_iq, 

27 ) 

28 ) 

29 

30 async def search_get_form( 

31 self, 

32 _gateway_jid: JID, 

33 _node: str, 

34 ifrom: JID, 

35 iq: Iq, 

36 ) -> Iq: 

37 """ 

38 Prepare the search form using :attr:`.BaseSession.SEARCH_FIELDS` 

39 """ 

40 await self._get_session(iq) 

41 

42 xmpp = self.xmpp 

43 

44 reply = iq.reply() 

45 form = reply["search"]["form"] 

46 form["title"] = xmpp.SEARCH_TITLE 

47 form["instructions"] = xmpp.SEARCH_INSTRUCTIONS 

48 for field in xmpp.SEARCH_FIELDS: 

49 form.append(field.get_xml()) 

50 return reply # type:ignore[no-any-return] 

51 

52 async def _search_query( 

53 self, 

54 _gateway_jid: JID, 

55 _node: str, 

56 ifrom: JID, 

57 iq: Iq, 

58 ) -> Iq: 

59 """ 

60 Handles a search request 

61 """ 

62 session = await self._get_session(iq) 

63 

64 result = await session.on_search(iq["search"]["form"].get_values()) 

65 

66 if not result: 

67 raise XMPPError("item-not-found", text="Nothing was found") 

68 

69 reply = iq.reply() 

70 form = reply["search"]["form"] 

71 for field in result.fields: 

72 form.add_reported(field.var, label=field.label, type=field.type) 

73 for item in result.items: 

74 form.add_item(item) 

75 return reply # type:ignore[no-any-return] 

76 

77 @exceptions_to_xmpp_errors 

78 async def _handle_gateway_iq(self, iq: Iq) -> None: 

79 if iq.get_to() != self.xmpp.boundjid.bare: 

80 raise XMPPError("bad-request", "This can only be used on the component JID") 

81 

82 if len(self.xmpp.SEARCH_FIELDS) > 1: 

83 raise XMPPError( 

84 "feature-not-implemented", "Use jabber search for this gateway" 

85 ) 

86 

87 session = await self._get_session(iq) 

88 field = self.xmpp.SEARCH_FIELDS[0] 

89 

90 reply = iq.reply() 

91 if iq["type"] == "get": 

92 reply["gateway"]["desc"] = self.xmpp.SEARCH_TITLE 

93 reply["gateway"]["prompt"] = field.label 

94 elif iq["type"] == "set": 

95 prompt = iq["gateway"]["prompt"] 

96 result = await session.on_search({field.var: prompt}) 

97 if result is None or not result.items: 

98 raise XMPPError( 

99 "item-not-found", "No contact was found with the info you provided." 

100 ) 

101 if len(result.items) > 1: 

102 raise XMPPError( 

103 "bad-request", "Your search yielded more than one result." 

104 ) 

105 reply["gateway"]["jid"] = result.items[0]["jid"] 

106 

107 reply.send()