Coverage for slidge/command/register.py: 44%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1""" 

2This module handles the registration :term:`Command`, which is a necessary 

3step for a JID to become a slidge :term:`User`. 

4""" 

5 

6import asyncio 

7import functools 

8import tempfile 

9from enum import IntEnum 

10from typing import Any 

11 

12import qrcode 

13from slixmpp import JID, Iq 

14from slixmpp.exceptions import XMPPError 

15 

16from ..core import config 

17from ..db import GatewayUser 

18from ..util.types import UserPreferences 

19from .base import Command, CommandAccess, Form, FormField, FormValues 

20from .user import Preferences 

21 

22 

23class RegistrationType(IntEnum): 

24 """ 

25 An :class:`Enum` to define the registration flow. 

26 """ 

27 

28 SINGLE_STEP_FORM = 0 

29 """ 

30 1 step, 1 form, the only flow compatible with :xep:`0077`. 

31 Using this, the whole flow is defined 

32 by :attr:`slidge.BaseGateway.REGISTRATION_FIELDS` and 

33 :attr:`.REGISTRATION_INSTRUCTIONS`. 

34 """ 

35 

36 QRCODE = 10 

37 """ 

38 The registration requires flashing a QR code in an official client. 

39 See :meth:`slidge.BaseGateway.send_qr`, :meth:`.get_qr_text` 

40 and :meth:`.confirm_qr`. 

41 """ 

42 

43 TWO_FACTOR_CODE = 20 

44 """ 

45 The registration requires confirming login with a 2FA code, 

46 eg something received by email or SMS to finalize the authentication. 

47 See :meth:`.validate_two_factor_code`. 

48 """ 

49 

50 

51class TwoFactorNotRequired(Exception): 

52 """ 

53 Should be raised in :meth:`slidge.BaseGateway.validate` if the code is not 

54 required after all. This can happen for a :term:`Legacy Network` where 2FA 

55 is optional. 

56 """ 

57 

58 pass 

59 

60 

61class Register(Command): 

62 NAME = "📝 Register to the gateway" 

63 HELP = "Link your JID to this gateway" 

64 NODE = "jabber:iq:register" 

65 CHAT_COMMAND = "register" 

66 ACCESS = CommandAccess.NON_USER 

67 

68 SUCCESS_MESSAGE = "Success, welcome!" 

69 

70 def _finalize( 

71 self, form_values: UserPreferences, _session, ifrom: JID, user: GatewayUser, *_ 

72 ) -> str: 

73 user.preferences = form_values # type: ignore 

74 self.xmpp.store.users.update(user) 

75 self.xmpp.event("user_register", Iq(sfrom=ifrom.bare)) 

76 return self.SUCCESS_MESSAGE 

77 

78 async def run(self, _session, ifrom: JID, *_): 

79 self.xmpp.raise_if_not_allowed_jid(ifrom) 

80 return Form( 

81 title=f"Registration to '{self.xmpp.COMPONENT_NAME}'", 

82 instructions=self.xmpp.REGISTRATION_INSTRUCTIONS, 

83 fields=self.xmpp.REGISTRATION_FIELDS, 

84 handler=self.register, 

85 ) 

86 

87 async def register(self, form_values: dict[str, Any], _session, ifrom: JID): 

88 two_fa_needed = True 

89 try: 

90 data = await self.xmpp.user_prevalidate(ifrom, form_values) 

91 except ValueError as e: 

92 raise XMPPError("bad-request", str(e)) 

93 except TwoFactorNotRequired: 

94 data = None 

95 if self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE: 

96 two_fa_needed = False 

97 else: 

98 raise 

99 

100 user = GatewayUser( 

101 jid=JID(ifrom.bare), 

102 legacy_module_data=form_values if data is None else data, 

103 ) 

104 

105 if self.xmpp.REGISTRATION_TYPE == RegistrationType.SINGLE_STEP_FORM or ( 

106 self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE 

107 and not two_fa_needed 

108 ): 

109 return await self.preferences(user) 

110 

111 if self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE: 

112 return Form( 

113 title=self.xmpp.REGISTRATION_2FA_TITLE, 

114 instructions=self.xmpp.REGISTRATION_2FA_INSTRUCTIONS, 

115 fields=[FormField("code", label="Code", required=True)], 

116 handler=functools.partial(self.two_fa, user=user), 

117 ) 

118 

119 elif self.xmpp.REGISTRATION_TYPE == RegistrationType.QRCODE: 

120 self.xmpp.qr_pending_registrations[ # type:ignore 

121 user.jid.bare 

122 ] = ( 

123 self.xmpp.loop.create_future() 

124 ) 

125 qr_text = await self.xmpp.get_qr_text(user) 

126 qr = qrcode.make(qr_text) 

127 with tempfile.NamedTemporaryFile( 

128 suffix=".png", delete=config.NO_UPLOAD_METHOD != "move" 

129 ) as f: 

130 qr.save(f.name) 

131 img_url, _ = await self.xmpp.send_file(f.name, mto=ifrom) 

132 if img_url is None: 

133 raise XMPPError( 

134 "internal-server-error", "Slidge cannot send attachments" 

135 ) 

136 self.xmpp.send_text(qr_text, mto=ifrom) 

137 return Form( 

138 title="Flash this", 

139 instructions="Flash this QR in the appropriate place", 

140 fields=[ 

141 FormField( 

142 "qr_img", 

143 type="fixed", 

144 value=qr_text, 

145 image_url=img_url, 

146 ), 

147 FormField( 

148 "qr_text", 

149 type="fixed", 

150 value=qr_text, 

151 label="Text encoded in the QR code", 

152 ), 

153 FormField( 

154 "qr_img_url", 

155 type="fixed", 

156 value=img_url, 

157 label="URL of the QR code image", 

158 ), 

159 ], 

160 handler=functools.partial(self.qr, user=user), 

161 ) 

162 

163 async def two_fa( 

164 self, form_values: FormValues, _session, _ifrom, user: GatewayUser 

165 ): 

166 assert isinstance(form_values["code"], str) 

167 data = await self.xmpp.validate_two_factor_code(user, form_values["code"]) 

168 if data is not None: 

169 user.legacy_module_data.update(data) 

170 return await self.preferences(user) 

171 

172 async def qr(self, _form_values: FormValues, _session, _ifrom, user: GatewayUser): 

173 try: 

174 data = await asyncio.wait_for( 

175 self.xmpp.qr_pending_registrations[user.jid.bare], # type:ignore 

176 config.QR_TIMEOUT, 

177 ) 

178 except asyncio.TimeoutError: 

179 raise XMPPError( 

180 "remote-server-timeout", 

181 ( 

182 "It does not seem that the QR code was correctly used, " 

183 "or you took too much time" 

184 ), 

185 ) 

186 if data is not None: 

187 user.legacy_module_data.update(data) 

188 return await self.preferences(user) 

189 

190 async def preferences(self, user: GatewayUser) -> Form: 

191 return Form( 

192 title="Preferences", 

193 instructions=Preferences.HELP, 

194 fields=self.xmpp.PREFERENCES, 

195 handler=functools.partial(self._finalize, user=user), # type:ignore 

196 )