from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import json from pathlib import Path SECRET_KEY = "mc-panel-secret-key-change-in-production" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 43200 # 30 дней pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer() USERS_FILE = Path("data/users.json") USERS_FILE.parent.mkdir(exist_ok=True) def load_users(): if USERS_FILE.exists(): with open(USERS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_users(users): with open(USERS_FILE, 'w', encoding='utf-8') as f: json.dump(users, f, indent=2, ensure_ascii=False) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def decode_token(token: str): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: return None async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials payload = decode_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен авторизации" ) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен авторизации" ) users = load_users() if username not in users: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Пользователь не найден" ) return {"username": username, "role": users[username].get("role", "user")} def authenticate_user(username: str, password: str): users = load_users() if username not in users: return False user = users[username] if not verify_password(password, user["password"]): return False return user def create_user(username: str, password: str, role: str = "user"): users = load_users() if username in users: return False users[username] = { "password": get_password_hash(password), "role": role, "created_at": datetime.utcnow().isoformat(), "servers": [] # Список серверов к которым есть доступ } save_users(users) return True def get_user_servers(username: str): """Получить список серверов пользователя""" users = load_users() if username not in users: return [] return users[username].get("servers", []) def add_server_to_user(username: str, server_name: str): """Добавить сервер пользователю""" users = load_users() if username not in users: return False if "servers" not in users[username]: users[username]["servers"] = [] if server_name not in users[username]["servers"]: users[username]["servers"].append(server_name) save_users(users) return True def remove_server_from_user(username: str, server_name: str): """Удалить сервер у пользователя""" users = load_users() if username not in users: return False if "servers" in users[username] and server_name in users[username]["servers"]: users[username]["servers"].remove(server_name) save_users(users) return True def get_server_users(server_name: str): """Получить список пользователей с доступом к серверу""" users = load_users() result = [] for username, user_data in users.items(): if server_name in user_data.get("servers", []): result.append({ "username": username, "role": user_data.get("role", "user") }) return result def has_server_access(username: str, server_name: str): """Проверить есть ли доступ к серверу""" users = load_users() if username not in users: return False user = users[username] # Админы имеют доступ ко всем серверам if user.get("role") == "admin": return True return server_name in user.get("servers", [])