Source code for slidgram.gateway

import logging
import typing

import sqlalchemy as sa
from pyrogram import Client
from pyrogram.errors import AuthKeyUnregistered, SessionPasswordNeeded
from pyrogram.types import User as TGUser
from slidge import BaseGateway, global_config
from slidge.command.register import (
    FormField,
    GatewayUser,
    RegistrationType,
    TwoFactorNotRequired,
)
from slidge.util.util import is_valid_phone_number
from slixmpp import JID
from slixmpp.exceptions import XMPPError

from . import config, reactions

if typing.TYPE_CHECKING:
    from .session import Session

[docs] REGISTRATION_INSTRUCTIONS = ( "You need to create a telegram account in an official telegram client.\n\nThen you" " can enter your phone number here, and you will receive a confirmation code in the" " official telegram client. You can uninstall the telegram client after this if you" " want." )
[docs] class Gateway(BaseGateway):
[docs] REGISTRATION_INSTRUCTIONS = REGISTRATION_INSTRUCTIONS
[docs] REGISTRATION_FIELDS = [ FormField(var="phone", label="Phone number", required=True), FormField( var="password", label="Password (only required if you set up one in Telegram)", required=False, private=True, ), ]
[docs] REGISTRATION_TYPE = RegistrationType.TWO_FACTOR_CODE
[docs] ROSTER_GROUP = "Telegram"
[docs] COMPONENT_NAME = "Telegram (slidge)"
[docs] COMPONENT_TYPE = "telegram"
[docs] COMPONENT_AVATAR = "https://web.telegram.org/img/logo_share.png"
[docs] SEARCH_FIELDS = [ FormField(var="phone", label="Phone number", required=True), FormField( var="first", label="A first name for this contact (you can use whatever you want).", required=True, ), FormField( var="last", label="A last name for this contact (you can use whatever you want).", ), ]
[docs] GROUPS = True
LEGACY_MSG_ID_TYPE = LEGACY_CONTACT_ID_TYPE = LEGACY_ROOM_ID_TYPE = int def __init__(self): super().__init__() if not config.API_ID: self.REGISTRATION_FIELDS.extend( [ FormField( var="info", type="fixed", label="Get API id and hash on https://my.telegram.org/apps", ), FormField(var="api_id", label="API ID", required=True), FormField(var="api_hash", label="API Hash", required=True), ] ) if not logging.getLogger().isEnabledFor(logging.DEBUG): logging.getLogger("pyrogram.connection.connection").setLevel( logging.WARNING ) logging.getLogger("pyrogram.session.session").setLevel(logging.WARNING) logging.getLogger("pyrogram.session.auth").setLevel(logging.WARNING) reactions_db_path = global_config.HOME_DIR / "reacters.sqlite" reactions.engine = sa.create_engine(f"sqlite:///{reactions_db_path}") if not reactions_db_path.exists(): reactions.Base.metadata.create_all(reactions.engine) if config.CONVERT_STICKERS: self.stickers_dir = global_config.HOME_DIR / "stickers" self.stickers_dir.mkdir(exist_ok=True)
[docs] async def validate(self, user_jid: JID, registration_form: dict[str, str | None]): phone = registration_form["phone"] assert isinstance(phone, str) if not is_valid_phone_number(phone): raise ValueError("Not a valid phone number") for u in self.store.users.get_all(): if u.legacy_module_data.get("phone") == phone: raise XMPPError( "not-allowed", text="Someone is already using this phone number on this server.", ) tg_client = Client( str(user_jid.bare), phone_number=phone, api_id=registration_form.get("api_id") or config.API_ID, api_hash=registration_form.get("api_hash") or config.API_HASH, workdir=global_config.HOME_DIR, ) if await tg_client.connect(): await tg_client.disconnect() raise TwoFactorNotRequired sent_code = await tg_client.send_code(phone) log.debug("The confirmation code for has been sent via %s", sent_code) _clients[str(user_jid.bare)] = tg_client return registration_form | { "sent_code_hash": sent_code.phone_code_hash, "api_id": registration_form.get("api_id") or config.API_ID, "api_hash": registration_form.get("api_hash") or config.API_HASH, }
[docs] async def validate_two_factor_code(self, user: GatewayUser, code: str): phone = user.legacy_module_data["phone"] code_hash = user.legacy_module_data["sent_code_hash"] assert isinstance(phone, str) assert isinstance(code_hash, str) tg_client = _clients[str(user.jid)] tg_client.phone_code = code try: tg_user = await tg_client.sign_in(phone, code_hash, code) except SessionPasswordNeeded as e: log.debug("Password needed:", exc_info=e) password = user.legacy_module_data["password"] assert isinstance(password, str) tg_user = await tg_client.check_password(password) tg_client.password = password await tg_client.disconnect() del _clients[str(user.jid)] if not isinstance(tg_user, TGUser): log.error("Not a TG User: %s", tg_user) raise XMPPError( "not-authorized", text=( "Something went wrong when trying to authenticate you on the " "telegram network. Please retry and/or contact your slidge admin." ), )
[docs] async def unregister(self, user: GatewayUser): session: "Session" = self.get_session_from_user(user) # type: ignore try: await session.tg.log_out() except (AuthKeyUnregistered, ConnectionError): # can happen when the session is killed from another tg client await session.tg.storage.delete()
[docs] _clients: dict[str, Client] = {}
[docs] log = logging.getLogger(__name__)