Coverage for slidge/slixfix/xep_0077/register.py: 44%
156 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1# Slixmpp: The Slick XMPP Library
2# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
3# This file is part of Slixmpp.
4# See the file LICENSE for copying permission.
5import logging
6import ssl
8from slixmpp.exceptions import XMPPError
9from slixmpp.plugins import BasePlugin
10from slixmpp.stanza import Iq, StreamFeatures
11from slixmpp.xmlstream import JID, StanzaBase, register_stanza_plugin
12from slixmpp.xmlstream.handler import CoroutineCallback
13from slixmpp.xmlstream.matcher import StanzaPath
15from . import stanza
16from .stanza import Register, RegisterFeature
18log = logging.getLogger(__name__)
21# noinspection PyPep8Naming
22class XEP_0077(BasePlugin):
23 """
24 XEP-0077: In-Band Registration
26 Events:
28 ::
30 user_register -- After successful validation and add to the user store
31 in api["user_validate"]
32 user_unregister -- After successful user removal in api["user_remove"]
33 user_modify -- After successful user modify in api["user_modify"]
35 Config:
37 ::
39 form_fields and form_instructions are only used if api["make_registration_form"] is
40 not overridden; in this case form_fields MUST be None
42 API:
44 ::
46 user_get(jid, node, ifrom, iq)
47 Returns a dict-like object containing `form_fields` for this user or None
48 user_remove(jid, node, ifrom, iq)
49 Removes a user or raise KeyError in case the user is not found in the user store
50 make_registration_form(self, jid, node, ifrom, iq)
51 Returns an iq reply to a registration form request, pre-filled and with
52 <registered/> in case the requesting entity is already registered to us
53 user_validate((self, jid, node, ifrom, registration)
54 Add the user to the user store or raise ValueError(msg) if any problem is encountered
55 msg is sent back to the XMPP client as an error message.
56 user_modify(jid, node, ifrom, iq)
57 Modify the user in the user store or raise ValueError(msg) (similarly to user_validate)
58 """
60 name = "xep_0077"
61 description = "XEP-0077: In-Band Registration (slidge)"
62 dependencies = {"xep_0004", "xep_0066"}
63 stanza = stanza
64 default_config = {
65 "create_account": True,
66 "force_registration": False,
67 "order": 50,
68 "form_fields": {"username", "password"},
69 "form_instructions": "Enter your credentials",
70 "enable_subscription": True,
71 }
72 _user_store: dict[str, dict[str, str]]
74 def plugin_init(self):
75 register_stanza_plugin(StreamFeatures, RegisterFeature)
76 register_stanza_plugin(Iq, Register)
78 if self.xmpp.is_component:
79 self.xmpp["xep_0030"].add_feature("jabber:iq:register")
80 self.xmpp.register_handler(
81 CoroutineCallback(
82 "registration",
83 StanzaPath(f"/iq@to={self.xmpp.boundjid.bare}/register"),
84 self._handle_registration,
85 )
86 )
87 self._user_store = {}
88 self.api.register(self._user_get, "user_get")
89 self.api.register(self._user_remove, "user_remove")
90 self.api.register(self._user_modify, "user_modify")
91 self.api.register(self._make_registration_form, "make_registration_form")
92 self.api.register(self._user_validate, "user_validate")
93 else:
94 self.xmpp.register_feature(
95 "register",
96 self._handle_register_feature,
97 restart=False,
98 order=self.order,
99 )
101 register_stanza_plugin(Register, self.xmpp["xep_0004"].stanza.Form)
102 register_stanza_plugin(Register, self.xmpp["xep_0066"].stanza.OOB)
104 self.xmpp.add_event_handler("connected", self._force_registration)
106 def plugin_end(self):
107 if not self.xmpp.is_component:
108 self.xmpp.unregister_feature("register", self.order)
110 def _user_get(self, _jid, _node, _ifrom, iq):
111 return self._user_store.get(iq["from"].bare)
113 def _user_remove(self, _jid, _node, _ifrom, iq):
114 return self._user_store.pop(iq["from"].bare)
116 async def _make_registration_form(self, _jid, _node, _ifrom, iq: Iq):
117 reg = iq["register"]
118 user = await self.api["user_get"](None, None, iq["from"], iq)
120 if user is None:
121 user = {}
122 else:
123 reg["registered"] = True
125 reg["instructions"] = self.form_instructions
127 for field in self.form_fields:
128 data = user.get(field, "")
129 if data:
130 reg[field] = data
131 else:
132 # Add a blank field
133 reg.add_field(field)
135 reply = iq.reply()
136 reply.set_payload(reg.xml)
137 return reply
139 def _user_validate(self, _jid, _node, ifrom, registration):
140 self._user_store[ifrom.bare] = {
141 key: registration[key] for key in self.form_fields
142 }
144 def _user_modify(self, _jid, _node, ifrom, registration):
145 self._user_store[ifrom.bare] = {
146 key: registration[key] for key in self.form_fields
147 }
149 async def _handle_registration(self, iq: StanzaBase):
150 if iq["type"] == "get":
151 if not self.enable_subscription:
152 raise XMPPError(
153 "bad-request",
154 text="You must use adhoc commands to register to this gateway.",
155 )
156 await self._send_form(iq)
157 elif iq["type"] == "set":
158 form_dict = iq["register"]["form"].get_values() or iq["register"]
160 if form_dict.get("remove"):
161 try:
162 await self.api["user_remove"](None, None, iq["from"], iq)
163 except KeyError:
164 _send_error(
165 iq,
166 "404",
167 "cancel",
168 "item-not-found",
169 "User not found",
170 )
171 else:
172 reply = iq.reply()
173 reply.send()
174 self.xmpp.event("user_unregister", iq)
175 return
177 if not self.enable_subscription:
178 raise XMPPError(
179 "bad-request",
180 text="You must use adhoc commands to register to this gateway.",
181 )
183 if self.form_fields is not None:
184 for field in self.form_fields:
185 if not iq["register"][field]:
186 # Incomplete Registration
187 _send_error(
188 iq,
189 "406",
190 "modify",
191 "not-acceptable",
192 "Please fill in all fields.",
193 )
194 return
196 user = await self.api["user_get"](None, None, iq["from"], iq)
198 try:
199 if user is None:
200 await self.api["user_validate"](None, None, iq["from"], form_dict)
201 else:
202 await self.api["user_modify"](None, None, iq["from"], form_dict)
203 except ValueError as e:
204 _send_error(iq, "406", "modify", "not-acceptable", "\n".join(e.args))
205 return
207 reply = iq.reply()
208 reply.send()
210 if user is None:
211 self.xmpp.event("user_register", iq)
212 else:
213 self.xmpp.event("user_modify", iq)
215 async def _send_form(self, iq):
216 reply = await self.api["make_registration_form"](None, None, iq["from"], iq)
217 reply.send()
219 def _force_registration(self, _event):
220 if self.force_registration:
221 self.xmpp.add_filter("in", self._force_stream_feature)
223 def _force_stream_feature(self, stanza_):
224 if isinstance(stanza_, StreamFeatures):
225 if not self.xmpp.disable_starttls:
226 if "starttls" not in self.xmpp.features:
227 return stanza_
228 elif not isinstance(self.xmpp.socket, ssl.SSLSocket):
229 return stanza_
230 if "mechanisms" not in self.xmpp.features:
231 log.debug("Forced adding in-band registration stream feature")
232 stanza_.enable("register")
233 self.xmpp.del_filter("in", self._force_stream_feature)
234 return stanza_
236 async def _handle_register_feature(self, _features):
237 if "mechanisms" in self.xmpp.features:
238 # We have already logged in with an account
239 return False
241 if self.create_account and self.xmpp.event_handled("register"):
242 form = await self.get_registration()
243 await self.xmpp.event_async("register", form)
244 return True
245 return False
247 def get_registration(self, jid=None, ifrom=None, timeout=None, callback=None):
248 iq = self.xmpp.Iq()
249 iq["type"] = "get"
250 iq["to"] = jid
251 iq["from"] = ifrom
252 iq.enable("register")
253 return iq.send(timeout=timeout, callback=callback)
255 def cancel_registration(self, jid=None, ifrom=None, timeout=None, callback=None):
256 iq = self.xmpp.Iq()
257 iq["type"] = "set"
258 iq["to"] = jid
259 iq["from"] = ifrom
260 iq["register"]["remove"] = True
261 return iq.send(timeout=timeout, callback=callback)
263 def change_password(
264 self, password, jid=None, ifrom=None, timeout=None, callback=None
265 ):
266 iq = self.xmpp.Iq()
267 iq["type"] = "set"
268 iq["to"] = jid
269 iq["from"] = ifrom
270 if self.xmpp.is_component:
271 ifrom = JID(ifrom)
272 iq["register"]["username"] = ifrom.user
273 else:
274 iq["register"]["username"] = self.xmpp.boundjid.user
275 iq["register"]["password"] = password
276 return iq.send(timeout=timeout, callback=callback)
279def _send_error(iq, code, error_type, name, text=""):
280 # It would be nice to raise XMPPError but the iq payload
281 # should include the register info
282 reply = iq.reply()
283 reply.set_payload(iq["register"].xml)
284 reply.error()
285 reply["error"]["code"] = code
286 reply["error"]["type"] = error_type
287 reply["error"]["condition"] = name
288 reply["error"]["text"] = text
289 reply.send()