Coverage for slidge / command / register.py: 43%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1""" 

2This module handles the registration :term:`Command`, which is a necessary 

3step for a JID to become a slidge :term:`User`. 

4""" 

5 

6import asyncio 

7import functools 

8import tempfile 

9from enum import IntEnum 

10from typing import Any 

11 

12import qrcode 

13from slixmpp import JID, Iq 

14from slixmpp.exceptions import XMPPError 

15 

16from ..core import config 

17from ..db import GatewayUser 

18from ..util.types import AnySession, UserPreferences 

19from .base import Command, CommandAccess, Form, FormField, FormSession, FormValues 

20from .user import Preferences 

21 

22 

23class RegistrationType(IntEnum): 

24 """ 

25 An :class:`Enum` to define the registration flow. 

26 """ 

27 

28 SINGLE_STEP_FORM = 0 

29 """ 

30 1 step, 1 form, the only flow compatible with :xep:`0077`. 

31 Using this, the whole flow is defined 

32 by :attr:`slidge.BaseGateway.REGISTRATION_FIELDS` and 

33 :attr:`.REGISTRATION_INSTRUCTIONS`. 

34 """ 

35 

36 QRCODE = 10 

37 """ 

38 The registration requires flashing a QR code in an official client. 

39 See :meth:`slidge.BaseGateway.send_qr`, :meth:`.get_qr_text` 

40 and :meth:`.confirm_qr`. 

41 """ 

42 

43 TWO_FACTOR_CODE = 20 

44 """ 

45 The registration requires confirming login with a 2FA code, 

46 eg something received by email or SMS to finalize the authentication. 

47 See :meth:`.validate_two_factor_code`. 

48 """ 

49 

50 

51class TwoFactorNotRequired(Exception): 

52 """ 

53 Should be raised in :meth:`slidge.BaseGateway.validate` if the code is not 

54 required after all. This can happen for a :term:`Legacy Network` where 2FA 

55 is optional. 

56 """ 

57 

58 pass 

59 

60 

61class Register(Command[AnySession]): 

62 NAME = "📝 Register to the gateway" 

63 HELP = "Link your JID to this gateway" 

64 NODE = "jabber:iq:register" 

65 CHAT_COMMAND = "register" 

66 ACCESS = CommandAccess.NON_USER 

67 

68 SUCCESS_MESSAGE = "Success, welcome!" 

69 

70 def _finalize( 

71 self, 

72 form_values: UserPreferences, 

73 _session: None, 

74 ifrom: JID, 

75 user: GatewayUser, 

76 *_: Any, # noqa:ANN401 

77 ) -> str: 

78 user.preferences = form_values # type: ignore 

79 self.xmpp.store.users.update(user) 

80 self.xmpp.event("user_register", Iq(sfrom=ifrom.bare)) 

81 return self.SUCCESS_MESSAGE 

82 

83 async def run( 

84 self, 

85 _session: AnySession | None, 

86 ifrom: JID, 

87 *_: str, 

88 ) -> FormSession[AnySession]: 

89 self.xmpp.raise_if_not_allowed_jid(ifrom) 

90 return FormSession( 

91 title=f"Registration to '{self.xmpp.COMPONENT_NAME}'", 

92 instructions=self.xmpp.REGISTRATION_INSTRUCTIONS, 

93 fields=self.xmpp.REGISTRATION_FIELDS, 

94 handler=self.register, 

95 ) 

96 

97 async def register( 

98 self, 

99 form_values: dict[str, Any], 

100 _session: None, 

101 ifrom: JID, 

102 ) -> Form | None: 

103 two_fa_needed = True 

104 try: 

105 data = await self.xmpp.user_prevalidate(ifrom, form_values) 

106 except ValueError as e: 

107 raise XMPPError("bad-request", str(e)) 

108 except TwoFactorNotRequired: 

109 data = None 

110 if self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE: 

111 two_fa_needed = False 

112 else: 

113 raise 

114 

115 user = GatewayUser( 

116 jid=JID(ifrom.bare), 

117 legacy_module_data=form_values if data is None else data, 

118 ) 

119 

120 if self.xmpp.REGISTRATION_TYPE == RegistrationType.SINGLE_STEP_FORM or ( 

121 self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE 

122 and not two_fa_needed 

123 ): 

124 return await self.preferences(user) 

125 

126 if self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE: 

127 return Form( 

128 title=self.xmpp.REGISTRATION_2FA_TITLE, 

129 instructions=self.xmpp.REGISTRATION_2FA_INSTRUCTIONS, 

130 fields=[FormField("code", label="Code", required=True)], 

131 handler=functools.partial(self.two_fa, user=user), 

132 ) 

133 

134 elif self.xmpp.REGISTRATION_TYPE == RegistrationType.QRCODE: 

135 self.xmpp.qr_pending_registrations[user.jid.bare] = ( 

136 self.xmpp.loop.create_future() 

137 ) 

138 qr_text = await self.xmpp.get_qr_text(user) 

139 qr = qrcode.make(qr_text) 

140 with tempfile.NamedTemporaryFile( 

141 suffix=".png", delete=config.NO_UPLOAD_METHOD != "move" 

142 ) as f: 

143 qr.save(f.name) 

144 img_url, _ = await self.xmpp.send_file(f.name, mto=ifrom) 

145 if img_url is None: 

146 raise XMPPError( 

147 "internal-server-error", "Slidge cannot send attachments" 

148 ) 

149 self.xmpp.send_text(qr_text, mto=ifrom) 

150 return Form( 

151 title="Flash this", 

152 instructions="Flash this QR in the appropriate place", 

153 fields=[ 

154 FormField( 

155 "qr_img", 

156 type="fixed", 

157 value=qr_text, 

158 image_url=img_url, 

159 ), 

160 FormField( 

161 "qr_text", 

162 type="fixed", 

163 value=qr_text, 

164 label="Text encoded in the QR code", 

165 ), 

166 FormField( 

167 "qr_img_url", 

168 type="fixed", 

169 value=img_url, 

170 label="URL of the QR code image", 

171 ), 

172 ], 

173 handler=functools.partial(self.qr, user=user), 

174 ) 

175 

176 async def two_fa( 

177 self, 

178 form_values: FormValues, 

179 _session: None, 

180 _ifrom: JID, 

181 user: GatewayUser, 

182 ) -> Form: 

183 assert isinstance(form_values["code"], str) 

184 data = await self.xmpp.validate_two_factor_code(user, form_values["code"]) 

185 if data is not None: 

186 user.legacy_module_data.update(data) 

187 return await self.preferences(user) 

188 

189 async def qr( 

190 self, 

191 _form_values: FormValues, 

192 _session: None, 

193 _ifrom: JID, 

194 user: GatewayUser, 

195 ) -> Form: 

196 try: 

197 data = await asyncio.wait_for( 

198 self.xmpp.qr_pending_registrations[user.jid.bare], 

199 config.QR_TIMEOUT, 

200 ) 

201 except TimeoutError: 

202 raise XMPPError( 

203 "remote-server-timeout", 

204 ( 

205 "It does not seem that the QR code was correctly used, " 

206 "or you took too much time" 

207 ), 

208 ) 

209 if data is not None: 

210 user.legacy_module_data.update(data) 

211 return await self.preferences(user) 

212 

213 async def preferences(self, user: GatewayUser) -> Form: 

214 return Form( 

215 title="Preferences", 

216 instructions=Preferences.HELP, 

217 fields=self.xmpp.PREFERENCES, 

218 handler=functools.partial(self._finalize, user=user), 

219 timeout_handler=functools.partial(self._preferences_timeout, user=user), 

220 ) 

221 

222 def _preferences_timeout(self, user: GatewayUser) -> None: 

223 self.xmpp.event("user_register", Iq(sfrom=user.jid.bare)) 

224 self.xmpp.store.users.update(user) 

225 self.xmpp.send_message( 

226 mfrom=self.xmpp.boundjid.bare, 

227 mto=user.jid, 

228 mbody="You did not choose your preferences in time, falling back to defaults. " 

229 "You can change preferences later with the 'preferences' command.\n" 

230 + self.SUCCESS_MESSAGE, 

231 )