Coverage for slidge/core/dispatcher/search.py: 57%

56 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1from typing import TYPE_CHECKING 

2 

3from slixmpp import JID, CoroutineCallback, Iq, StanzaPath 

4from slixmpp.exceptions import XMPPError 

5 

6from .util import DispatcherMixin, exceptions_to_xmpp_errors 

7 

8if TYPE_CHECKING: 

9 from slidge.core.gateway import BaseGateway 

10 

11 

12class SearchMixin(DispatcherMixin): 

13 def __init__(self, xmpp: "BaseGateway"): 

14 super().__init__(xmpp) 

15 

16 xmpp["xep_0055"].api.register(self.search_get_form, "search_get_form") 

17 xmpp["xep_0055"].api.register(self._search_query, "search_query") 

18 

19 xmpp.plugin["xep_0030"].add_feature("jabber:iq:gateway") 

20 xmpp.register_handler( 

21 CoroutineCallback( 

22 "iq:gateway", 

23 StanzaPath("iq/gateway"), 

24 self._handle_gateway_iq, # type: ignore 

25 ) 

26 ) 

27 

28 async def search_get_form(self, _gateway_jid, _node, ifrom: JID, iq: Iq): 

29 """ 

30 Prepare the search form using :attr:`.BaseSession.SEARCH_FIELDS` 

31 """ 

32 user = self.xmpp.store.users.get(ifrom) 

33 if user is None: 

34 raise XMPPError(text="Search is only allowed for registered users") 

35 

36 xmpp = self.xmpp 

37 

38 reply = iq.reply() 

39 form = reply["search"]["form"] 

40 form["title"] = xmpp.SEARCH_TITLE 

41 form["instructions"] = xmpp.SEARCH_INSTRUCTIONS 

42 for field in xmpp.SEARCH_FIELDS: 

43 form.append(field.get_xml()) 

44 return reply 

45 

46 async def _search_query(self, _gateway_jid, _node, ifrom: JID, iq: Iq): 

47 """ 

48 Handles a search request 

49 """ 

50 session = await self._get_session(iq) 

51 

52 result = await session.on_search(iq["search"]["form"].get_values()) 

53 

54 if not result: 

55 raise XMPPError("item-not-found", text="Nothing was found") 

56 

57 reply = iq.reply() 

58 form = reply["search"]["form"] 

59 for field in result.fields: 

60 form.add_reported(field.var, label=field.label, type=field.type) 

61 for item in result.items: 

62 form.add_item(item) 

63 return reply 

64 

65 @exceptions_to_xmpp_errors 

66 async def _handle_gateway_iq(self, iq: Iq): 

67 if iq.get_to() != self.xmpp.boundjid.bare: 

68 raise XMPPError("bad-request", "This can only be used on the component JID") 

69 

70 if len(self.xmpp.SEARCH_FIELDS) > 1: 

71 raise XMPPError( 

72 "feature-not-implemented", "Use jabber search for this gateway" 

73 ) 

74 

75 session = await self._get_session(iq) 

76 field = self.xmpp.SEARCH_FIELDS[0] 

77 

78 reply = iq.reply() 

79 if iq["type"] == "get": 

80 reply["gateway"]["desc"] = self.xmpp.SEARCH_TITLE 

81 reply["gateway"]["prompt"] = field.label 

82 elif iq["type"] == "set": 

83 prompt = iq["gateway"]["prompt"] 

84 result = await session.on_search({field.var: prompt}) 

85 if result is None or not result.items: 

86 raise XMPPError( 

87 "item-not-found", "No contact was found with the info you provided." 

88 ) 

89 if len(result.items) > 1: 

90 raise XMPPError( 

91 "bad-request", "Your search yielded more than one result." 

92 ) 

93 reply["gateway"]["jid"] = result.items[0]["jid"] 

94 

95 reply.send()