153 lines
5.0 KiB
Python
153 lines
5.0 KiB
Python
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
from jose import JWTError, jwt
|
||
from passlib.context import CryptContext
|
||
from fastapi import Depends, HTTPException, status
|
||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
import json
|
||
from pathlib import Path
|
||
|
||
SECRET_KEY = "mc-panel-secret-key-change-in-production"
|
||
ALGORITHM = "HS256"
|
||
ACCESS_TOKEN_EXPIRE_MINUTES = 43200 # 30 дней
|
||
|
||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||
security = HTTPBearer()
|
||
|
||
USERS_FILE = Path("data/users.json")
|
||
USERS_FILE.parent.mkdir(exist_ok=True)
|
||
|
||
def load_users():
|
||
if USERS_FILE.exists():
|
||
with open(USERS_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {}
|
||
|
||
def save_users(users):
|
||
with open(USERS_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(users, f, indent=2, ensure_ascii=False)
|
||
|
||
def verify_password(plain_password, hashed_password):
|
||
return pwd_context.verify(plain_password, hashed_password)
|
||
|
||
def get_password_hash(password):
|
||
return pwd_context.hash(password)
|
||
|
||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
||
to_encode = data.copy()
|
||
if expires_delta:
|
||
expire = datetime.utcnow() + expires_delta
|
||
else:
|
||
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
to_encode.update({"exp": expire})
|
||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||
return encoded_jwt
|
||
|
||
def decode_token(token: str):
|
||
try:
|
||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||
return payload
|
||
except JWTError:
|
||
return None
|
||
|
||
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
||
token = credentials.credentials
|
||
payload = decode_token(token)
|
||
|
||
if payload is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Неверный токен авторизации"
|
||
)
|
||
|
||
username: str = payload.get("sub")
|
||
if username is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Неверный токен авторизации"
|
||
)
|
||
|
||
users = load_users()
|
||
if username not in users:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Пользователь не найден"
|
||
)
|
||
|
||
return {"username": username, "role": users[username].get("role", "user")}
|
||
|
||
def authenticate_user(username: str, password: str):
|
||
users = load_users()
|
||
if username not in users:
|
||
return False
|
||
user = users[username]
|
||
if not verify_password(password, user["password"]):
|
||
return False
|
||
return user
|
||
|
||
def create_user(username: str, password: str, role: str = "user"):
|
||
users = load_users()
|
||
if username in users:
|
||
return False
|
||
|
||
users[username] = {
|
||
"password": get_password_hash(password),
|
||
"role": role,
|
||
"created_at": datetime.utcnow().isoformat(),
|
||
"servers": [] # Список серверов к которым есть доступ
|
||
}
|
||
save_users(users)
|
||
return True
|
||
|
||
def get_user_servers(username: str):
|
||
"""Получить список серверов пользователя"""
|
||
users = load_users()
|
||
if username not in users:
|
||
return []
|
||
return users[username].get("servers", [])
|
||
|
||
def add_server_to_user(username: str, server_name: str):
|
||
"""Добавить сервер пользователю"""
|
||
users = load_users()
|
||
if username not in users:
|
||
return False
|
||
if "servers" not in users[username]:
|
||
users[username]["servers"] = []
|
||
if server_name not in users[username]["servers"]:
|
||
users[username]["servers"].append(server_name)
|
||
save_users(users)
|
||
return True
|
||
|
||
def remove_server_from_user(username: str, server_name: str):
|
||
"""Удалить сервер у пользователя"""
|
||
users = load_users()
|
||
if username not in users:
|
||
return False
|
||
if "servers" in users[username] and server_name in users[username]["servers"]:
|
||
users[username]["servers"].remove(server_name)
|
||
save_users(users)
|
||
return True
|
||
|
||
def get_server_users(server_name: str):
|
||
"""Получить список пользователей с доступом к серверу"""
|
||
users = load_users()
|
||
result = []
|
||
for username, user_data in users.items():
|
||
if server_name in user_data.get("servers", []):
|
||
result.append({
|
||
"username": username,
|
||
"role": user_data.get("role", "user")
|
||
})
|
||
return result
|
||
|
||
def has_server_access(username: str, server_name: str):
|
||
"""Проверить есть ли доступ к серверу"""
|
||
users = load_users()
|
||
if username not in users:
|
||
return False
|
||
user = users[username]
|
||
# Админы имеют доступ ко всем серверам
|
||
if user.get("role") == "admin":
|
||
return True
|
||
return server_name in user.get("servers", [])
|