Coverage for slidge/slixfix/xep_0077/register.py: 44%

156 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1# Slixmpp: The Slick XMPP Library 

2# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout 

3# This file is part of Slixmpp. 

4# See the file LICENSE for copying permission. 

5import logging 

6import ssl 

7 

8from slixmpp.exceptions import XMPPError 

9from slixmpp.plugins import BasePlugin 

10from slixmpp.stanza import Iq, StreamFeatures 

11from slixmpp.xmlstream import JID, StanzaBase, register_stanza_plugin 

12from slixmpp.xmlstream.handler import CoroutineCallback 

13from slixmpp.xmlstream.matcher import StanzaPath 

14 

15from . import stanza 

16from .stanza import Register, RegisterFeature 

17 

18log = logging.getLogger(__name__) 

19 

20 

21# noinspection PyPep8Naming 

22class XEP_0077(BasePlugin): 

23 """ 

24 XEP-0077: In-Band Registration 

25 

26 Events: 

27 

28 :: 

29 

30 user_register -- After successful validation and add to the user store 

31 in api["user_validate"] 

32 user_unregister -- After successful user removal in api["user_remove"] 

33 user_modify -- After successful user modify in api["user_modify"] 

34 

35 Config: 

36 

37 :: 

38 

39 form_fields and form_instructions are only used if api["make_registration_form"] is 

40 not overridden; in this case form_fields MUST be None 

41 

42 API: 

43 

44 :: 

45 

46 user_get(jid, node, ifrom, iq) 

47 Returns a dict-like object containing `form_fields` for this user or None 

48 user_remove(jid, node, ifrom, iq) 

49 Removes a user or raise KeyError in case the user is not found in the user store 

50 make_registration_form(self, jid, node, ifrom, iq) 

51 Returns an iq reply to a registration form request, pre-filled and with 

52 <registered/> in case the requesting entity is already registered to us 

53 user_validate((self, jid, node, ifrom, registration) 

54 Add the user to the user store or raise ValueError(msg) if any problem is encountered 

55 msg is sent back to the XMPP client as an error message. 

56 user_modify(jid, node, ifrom, iq) 

57 Modify the user in the user store or raise ValueError(msg) (similarly to user_validate) 

58 """ 

59 

60 name = "xep_0077" 

61 description = "XEP-0077: In-Band Registration (slidge)" 

62 dependencies = {"xep_0004", "xep_0066"} 

63 stanza = stanza 

64 default_config = { 

65 "create_account": True, 

66 "force_registration": False, 

67 "order": 50, 

68 "form_fields": {"username", "password"}, 

69 "form_instructions": "Enter your credentials", 

70 "enable_subscription": True, 

71 } 

72 _user_store: dict[str, dict[str, str]] 

73 

74 def plugin_init(self): 

75 register_stanza_plugin(StreamFeatures, RegisterFeature) 

76 register_stanza_plugin(Iq, Register) 

77 

78 if self.xmpp.is_component: 

79 self.xmpp["xep_0030"].add_feature("jabber:iq:register") 

80 self.xmpp.register_handler( 

81 CoroutineCallback( 

82 "registration", 

83 StanzaPath(f"/iq@to={self.xmpp.boundjid.bare}/register"), 

84 self._handle_registration, 

85 ) 

86 ) 

87 self._user_store = {} 

88 self.api.register(self._user_get, "user_get") 

89 self.api.register(self._user_remove, "user_remove") 

90 self.api.register(self._user_modify, "user_modify") 

91 self.api.register(self._make_registration_form, "make_registration_form") 

92 self.api.register(self._user_validate, "user_validate") 

93 else: 

94 self.xmpp.register_feature( 

95 "register", 

96 self._handle_register_feature, 

97 restart=False, 

98 order=self.order, 

99 ) 

100 

101 register_stanza_plugin(Register, self.xmpp["xep_0004"].stanza.Form) 

102 register_stanza_plugin(Register, self.xmpp["xep_0066"].stanza.OOB) 

103 

104 self.xmpp.add_event_handler("connected", self._force_registration) 

105 

106 def plugin_end(self): 

107 if not self.xmpp.is_component: 

108 self.xmpp.unregister_feature("register", self.order) 

109 

110 def _user_get(self, _jid, _node, _ifrom, iq): 

111 return self._user_store.get(iq["from"].bare) 

112 

113 def _user_remove(self, _jid, _node, _ifrom, iq): 

114 return self._user_store.pop(iq["from"].bare) 

115 

116 async def _make_registration_form(self, _jid, _node, _ifrom, iq: Iq): 

117 reg = iq["register"] 

118 user = await self.api["user_get"](None, None, iq["from"], iq) 

119 

120 if user is None: 

121 user = {} 

122 else: 

123 reg["registered"] = True 

124 

125 reg["instructions"] = self.form_instructions 

126 

127 for field in self.form_fields: 

128 data = user.get(field, "") 

129 if data: 

130 reg[field] = data 

131 else: 

132 # Add a blank field 

133 reg.add_field(field) 

134 

135 reply = iq.reply() 

136 reply.set_payload(reg.xml) 

137 return reply 

138 

139 def _user_validate(self, _jid, _node, ifrom, registration): 

140 self._user_store[ifrom.bare] = { 

141 key: registration[key] for key in self.form_fields 

142 } 

143 

144 def _user_modify(self, _jid, _node, ifrom, registration): 

145 self._user_store[ifrom.bare] = { 

146 key: registration[key] for key in self.form_fields 

147 } 

148 

149 async def _handle_registration(self, iq: StanzaBase): 

150 if iq["type"] == "get": 

151 if not self.enable_subscription: 

152 raise XMPPError( 

153 "bad-request", 

154 text="You must use adhoc commands to register to this gateway.", 

155 ) 

156 await self._send_form(iq) 

157 elif iq["type"] == "set": 

158 form_dict = iq["register"]["form"].get_values() or iq["register"] 

159 

160 if form_dict.get("remove"): 

161 try: 

162 await self.api["user_remove"](None, None, iq["from"], iq) 

163 except KeyError: 

164 _send_error( 

165 iq, 

166 "404", 

167 "cancel", 

168 "item-not-found", 

169 "User not found", 

170 ) 

171 else: 

172 reply = iq.reply() 

173 reply.send() 

174 self.xmpp.event("user_unregister", iq) 

175 return 

176 

177 if not self.enable_subscription: 

178 raise XMPPError( 

179 "bad-request", 

180 text="You must use adhoc commands to register to this gateway.", 

181 ) 

182 

183 if self.form_fields is not None: 

184 for field in self.form_fields: 

185 if not iq["register"][field]: 

186 # Incomplete Registration 

187 _send_error( 

188 iq, 

189 "406", 

190 "modify", 

191 "not-acceptable", 

192 "Please fill in all fields.", 

193 ) 

194 return 

195 

196 user = await self.api["user_get"](None, None, iq["from"], iq) 

197 

198 try: 

199 if user is None: 

200 await self.api["user_validate"](None, None, iq["from"], form_dict) 

201 else: 

202 await self.api["user_modify"](None, None, iq["from"], form_dict) 

203 except ValueError as e: 

204 _send_error(iq, "406", "modify", "not-acceptable", "\n".join(e.args)) 

205 return 

206 

207 reply = iq.reply() 

208 reply.send() 

209 

210 if user is None: 

211 self.xmpp.event("user_register", iq) 

212 else: 

213 self.xmpp.event("user_modify", iq) 

214 

215 async def _send_form(self, iq): 

216 reply = await self.api["make_registration_form"](None, None, iq["from"], iq) 

217 reply.send() 

218 

219 def _force_registration(self, _event): 

220 if self.force_registration: 

221 self.xmpp.add_filter("in", self._force_stream_feature) 

222 

223 def _force_stream_feature(self, stanza_): 

224 if isinstance(stanza_, StreamFeatures): 

225 if not self.xmpp.disable_starttls: 

226 if "starttls" not in self.xmpp.features: 

227 return stanza_ 

228 elif not isinstance(self.xmpp.socket, ssl.SSLSocket): 

229 return stanza_ 

230 if "mechanisms" not in self.xmpp.features: 

231 log.debug("Forced adding in-band registration stream feature") 

232 stanza_.enable("register") 

233 self.xmpp.del_filter("in", self._force_stream_feature) 

234 return stanza_ 

235 

236 async def _handle_register_feature(self, _features): 

237 if "mechanisms" in self.xmpp.features: 

238 # We have already logged in with an account 

239 return False 

240 

241 if self.create_account and self.xmpp.event_handled("register"): 

242 form = await self.get_registration() 

243 await self.xmpp.event_async("register", form) 

244 return True 

245 return False 

246 

247 def get_registration(self, jid=None, ifrom=None, timeout=None, callback=None): 

248 iq = self.xmpp.Iq() 

249 iq["type"] = "get" 

250 iq["to"] = jid 

251 iq["from"] = ifrom 

252 iq.enable("register") 

253 return iq.send(timeout=timeout, callback=callback) 

254 

255 def cancel_registration(self, jid=None, ifrom=None, timeout=None, callback=None): 

256 iq = self.xmpp.Iq() 

257 iq["type"] = "set" 

258 iq["to"] = jid 

259 iq["from"] = ifrom 

260 iq["register"]["remove"] = True 

261 return iq.send(timeout=timeout, callback=callback) 

262 

263 def change_password( 

264 self, password, jid=None, ifrom=None, timeout=None, callback=None 

265 ): 

266 iq = self.xmpp.Iq() 

267 iq["type"] = "set" 

268 iq["to"] = jid 

269 iq["from"] = ifrom 

270 if self.xmpp.is_component: 

271 ifrom = JID(ifrom) 

272 iq["register"]["username"] = ifrom.user 

273 else: 

274 iq["register"]["username"] = self.xmpp.boundjid.user 

275 iq["register"]["password"] = password 

276 return iq.send(timeout=timeout, callback=callback) 

277 

278 

279def _send_error(iq, code, error_type, name, text=""): 

280 # It would be nice to raise XMPPError but the iq payload 

281 # should include the register info 

282 reply = iq.reply() 

283 reply.set_payload(iq["register"].xml) 

284 reply.error() 

285 reply["error"]["code"] = code 

286 reply["error"]["type"] = error_type 

287 reply["error"]["condition"] = name 

288 reply["error"]["text"] = text 

289 reply.send()