Coverage for slidge/core/dispatcher/search.py: 58%

59 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1from typing import TYPE_CHECKING 

2 

3from slixmpp import JID, CoroutineCallback, Iq, StanzaPath 

4from slixmpp.exceptions import XMPPError 

5 

6from ...db.models import GatewayUser 

7from .util import DispatcherMixin, exceptions_to_xmpp_errors 

8 

9if TYPE_CHECKING: 

10 from slidge.core.gateway import BaseGateway 

11 

12 

13class SearchMixin(DispatcherMixin): 

14 __slots__: list[str] = [] 

15 

16 def __init__(self, xmpp: "BaseGateway") -> None: 

17 super().__init__(xmpp) 

18 

19 xmpp["xep_0055"].api.register(self.search_get_form, "search_get_form") 

20 xmpp["xep_0055"].api.register(self._search_query, "search_query") 

21 

22 xmpp.plugin["xep_0030"].add_feature("jabber:iq:gateway") 

23 xmpp.register_handler( 

24 CoroutineCallback( 

25 "iq:gateway", 

26 StanzaPath("iq/gateway"), 

27 self._handle_gateway_iq, 

28 ) 

29 ) 

30 

31 async def search_get_form(self, _gateway_jid, _node, ifrom: JID, iq: Iq): 

32 """ 

33 Prepare the search form using :attr:`.BaseSession.SEARCH_FIELDS` 

34 """ 

35 with self.xmpp.store.session() as orm: 

36 user = orm.query(GatewayUser).one_or_none() 

37 if user is None: 

38 raise XMPPError(text="Search is only allowed for registered users") 

39 

40 xmpp = self.xmpp 

41 

42 reply = iq.reply() 

43 form = reply["search"]["form"] 

44 form["title"] = xmpp.SEARCH_TITLE 

45 form["instructions"] = xmpp.SEARCH_INSTRUCTIONS 

46 for field in xmpp.SEARCH_FIELDS: 

47 form.append(field.get_xml()) 

48 return reply 

49 

50 async def _search_query(self, _gateway_jid, _node, ifrom: JID, iq: Iq): 

51 """ 

52 Handles a search request 

53 """ 

54 session = await self._get_session(iq) 

55 

56 result = await session.on_search(iq["search"]["form"].get_values()) 

57 

58 if not result: 

59 raise XMPPError("item-not-found", text="Nothing was found") 

60 

61 reply = iq.reply() 

62 form = reply["search"]["form"] 

63 for field in result.fields: 

64 form.add_reported(field.var, label=field.label, type=field.type) 

65 for item in result.items: 

66 form.add_item(item) 

67 return reply 

68 

69 @exceptions_to_xmpp_errors 

70 async def _handle_gateway_iq(self, iq: Iq): 

71 if iq.get_to() != self.xmpp.boundjid.bare: 

72 raise XMPPError("bad-request", "This can only be used on the component JID") 

73 

74 if len(self.xmpp.SEARCH_FIELDS) > 1: 

75 raise XMPPError( 

76 "feature-not-implemented", "Use jabber search for this gateway" 

77 ) 

78 

79 session = await self._get_session(iq) 

80 field = self.xmpp.SEARCH_FIELDS[0] 

81 

82 reply = iq.reply() 

83 if iq["type"] == "get": 

84 reply["gateway"]["desc"] = self.xmpp.SEARCH_TITLE 

85 reply["gateway"]["prompt"] = field.label 

86 elif iq["type"] == "set": 

87 prompt = iq["gateway"]["prompt"] 

88 result = await session.on_search({field.var: prompt}) 

89 if result is None or not result.items: 

90 raise XMPPError( 

91 "item-not-found", "No contact was found with the info you provided." 

92 ) 

93 if len(result.items) > 1: 

94 raise XMPPError( 

95 "bad-request", "Your search yielded more than one result." 

96 ) 

97 reply["gateway"]["jid"] = result.items[0]["jid"] 

98 

99 reply.send()