Initial commit

This commit is contained in:
2026-01-14 20:23:10 +06:00
commit 954dd473d1
57 changed files with 8854 additions and 0 deletions

152
backend/auth.py Normal file
View File

@@ -0,0 +1,152 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import json
from pathlib import Path
SECRET_KEY = "mc-panel-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 43200 # 30 дней
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
USERS_FILE = Path("data/users.json")
USERS_FILE.parent.mkdir(exist_ok=True)
def load_users():
if USERS_FILE.exists():
with open(USERS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_users(users):
with open(USERS_FILE, 'w', encoding='utf-8') as f:
json.dump(users, f, indent=2, ensure_ascii=False)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
payload = decode_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен авторизации"
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен авторизации"
)
users = load_users()
if username not in users:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Пользователь не найден"
)
return {"username": username, "role": users[username].get("role", "user")}
def authenticate_user(username: str, password: str):
users = load_users()
if username not in users:
return False
user = users[username]
if not verify_password(password, user["password"]):
return False
return user
def create_user(username: str, password: str, role: str = "user"):
users = load_users()
if username in users:
return False
users[username] = {
"password": get_password_hash(password),
"role": role,
"created_at": datetime.utcnow().isoformat(),
"servers": [] # Список серверов к которым есть доступ
}
save_users(users)
return True
def get_user_servers(username: str):
"""Получить список серверов пользователя"""
users = load_users()
if username not in users:
return []
return users[username].get("servers", [])
def add_server_to_user(username: str, server_name: str):
"""Добавить сервер пользователю"""
users = load_users()
if username not in users:
return False
if "servers" not in users[username]:
users[username]["servers"] = []
if server_name not in users[username]["servers"]:
users[username]["servers"].append(server_name)
save_users(users)
return True
def remove_server_from_user(username: str, server_name: str):
"""Удалить сервер у пользователя"""
users = load_users()
if username not in users:
return False
if "servers" in users[username] and server_name in users[username]["servers"]:
users[username]["servers"].remove(server_name)
save_users(users)
return True
def get_server_users(server_name: str):
"""Получить список пользователей с доступом к серверу"""
users = load_users()
result = []
for username, user_data in users.items():
if server_name in user_data.get("servers", []):
result.append({
"username": username,
"role": user_data.get("role", "user")
})
return result
def has_server_access(username: str, server_name: str):
"""Проверить есть ли доступ к серверу"""
users = load_users()
if username not in users:
return False
user = users[username]
# Админы имеют доступ ко всем серверам
if user.get("role") == "admin":
return True
return server_name in user.get("servers", [])