Coverage for slidge/command/register.py: 44%
80 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1"""
2This module handles the registration :term:`Command`, which is a necessary
3step for a JID to become a slidge :term:`User`.
4"""
6import asyncio
7import functools
8import tempfile
9from enum import IntEnum
10from typing import Any
12import qrcode
13from slixmpp import JID, Iq
14from slixmpp.exceptions import XMPPError
16from ..core import config
17from ..db import GatewayUser
18from ..util.types import UserPreferences
19from .base import Command, CommandAccess, Form, FormField, FormValues
20from .user import Preferences
23class RegistrationType(IntEnum):
24 """
25 An :class:`Enum` to define the registration flow.
26 """
28 SINGLE_STEP_FORM = 0
29 """
30 1 step, 1 form, the only flow compatible with :xep:`0077`.
31 Using this, the whole flow is defined
32 by :attr:`slidge.BaseGateway.REGISTRATION_FIELDS` and
33 :attr:`.REGISTRATION_INSTRUCTIONS`.
34 """
36 QRCODE = 10
37 """
38 The registration requires flashing a QR code in an official client.
39 See :meth:`slidge.BaseGateway.send_qr`, :meth:`.get_qr_text`
40 and :meth:`.confirm_qr`.
41 """
43 TWO_FACTOR_CODE = 20
44 """
45 The registration requires confirming login with a 2FA code,
46 eg something received by email or SMS to finalize the authentication.
47 See :meth:`.validate_two_factor_code`.
48 """
51class TwoFactorNotRequired(Exception):
52 """
53 Should be raised in :meth:`slidge.BaseGateway.validate` if the code is not
54 required after all. This can happen for a :term:`Legacy Network` where 2FA
55 is optional.
56 """
58 pass
61class Register(Command):
62 NAME = "📝 Register to the gateway"
63 HELP = "Link your JID to this gateway"
64 NODE = "jabber:iq:register"
65 CHAT_COMMAND = "register"
66 ACCESS = CommandAccess.NON_USER
68 SUCCESS_MESSAGE = "Success, welcome!"
70 def _finalize(
71 self, form_values: UserPreferences, _session, ifrom: JID, user: GatewayUser, *_
72 ) -> str:
73 user.preferences = form_values # type: ignore
74 self.xmpp.store.users.update(user)
75 self.xmpp.event("user_register", Iq(sfrom=ifrom.bare))
76 return self.SUCCESS_MESSAGE
78 async def run(self, _session, ifrom: JID, *_):
79 self.xmpp.raise_if_not_allowed_jid(ifrom)
80 return Form(
81 title=f"Registration to '{self.xmpp.COMPONENT_NAME}'",
82 instructions=self.xmpp.REGISTRATION_INSTRUCTIONS,
83 fields=self.xmpp.REGISTRATION_FIELDS,
84 handler=self.register,
85 )
87 async def register(self, form_values: dict[str, Any], _session, ifrom: JID):
88 two_fa_needed = True
89 try:
90 data = await self.xmpp.user_prevalidate(ifrom, form_values)
91 except ValueError as e:
92 raise XMPPError("bad-request", str(e))
93 except TwoFactorNotRequired:
94 data = None
95 if self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE:
96 two_fa_needed = False
97 else:
98 raise
100 user = GatewayUser(
101 jid=JID(ifrom.bare),
102 legacy_module_data=form_values if data is None else data,
103 )
105 if self.xmpp.REGISTRATION_TYPE == RegistrationType.SINGLE_STEP_FORM or (
106 self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE
107 and not two_fa_needed
108 ):
109 return await self.preferences(user)
111 if self.xmpp.REGISTRATION_TYPE == RegistrationType.TWO_FACTOR_CODE:
112 return Form(
113 title=self.xmpp.REGISTRATION_2FA_TITLE,
114 instructions=self.xmpp.REGISTRATION_2FA_INSTRUCTIONS,
115 fields=[FormField("code", label="Code", required=True)],
116 handler=functools.partial(self.two_fa, user=user),
117 )
119 elif self.xmpp.REGISTRATION_TYPE == RegistrationType.QRCODE:
120 self.xmpp.qr_pending_registrations[ # type:ignore
121 user.jid.bare
122 ] = self.xmpp.loop.create_future()
123 qr_text = await self.xmpp.get_qr_text(user)
124 qr = qrcode.make(qr_text)
125 with tempfile.NamedTemporaryFile(
126 suffix=".png", delete=config.NO_UPLOAD_METHOD != "move"
127 ) as f:
128 qr.save(f.name)
129 img_url, _ = await self.xmpp.send_file(f.name, mto=ifrom)
130 if img_url is None:
131 raise XMPPError(
132 "internal-server-error", "Slidge cannot send attachments"
133 )
134 self.xmpp.send_text(qr_text, mto=ifrom)
135 return Form(
136 title="Flash this",
137 instructions="Flash this QR in the appropriate place",
138 fields=[
139 FormField(
140 "qr_img",
141 type="fixed",
142 value=qr_text,
143 image_url=img_url,
144 ),
145 FormField(
146 "qr_text",
147 type="fixed",
148 value=qr_text,
149 label="Text encoded in the QR code",
150 ),
151 FormField(
152 "qr_img_url",
153 type="fixed",
154 value=img_url,
155 label="URL of the QR code image",
156 ),
157 ],
158 handler=functools.partial(self.qr, user=user),
159 )
161 async def two_fa(
162 self, form_values: FormValues, _session, _ifrom, user: GatewayUser
163 ):
164 assert isinstance(form_values["code"], str)
165 data = await self.xmpp.validate_two_factor_code(user, form_values["code"])
166 if data is not None:
167 user.legacy_module_data.update(data)
168 return await self.preferences(user)
170 async def qr(self, _form_values: FormValues, _session, _ifrom, user: GatewayUser):
171 try:
172 data = await asyncio.wait_for(
173 self.xmpp.qr_pending_registrations[user.jid.bare], # type:ignore
174 config.QR_TIMEOUT,
175 )
176 except asyncio.TimeoutError:
177 raise XMPPError(
178 "remote-server-timeout",
179 (
180 "It does not seem that the QR code was correctly used, "
181 "or you took too much time"
182 ),
183 )
184 if data is not None:
185 user.legacy_module_data.update(data)
186 return await self.preferences(user)
188 async def preferences(self, user: GatewayUser) -> Form:
189 return Form(
190 title="Preferences",
191 instructions=Preferences.HELP,
192 fields=self.xmpp.PREFERENCES,
193 handler=functools.partial(self._finalize, user=user), # type:ignore
194 )