Files
NeveTimePanel/backend/user_management_endpoints.py

321 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API эндпоинты для управления пользователями (v1.1.0)
Требуется роль owner или admin
"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, List
import json
from pathlib import Path
router = APIRouter()
# Модели данных
class RoleChange(BaseModel):
role: str
class PermissionsUpdate(BaseModel):
permissions: dict
class ServerAccess(BaseModel):
server_name: str
class BanRequest(BaseModel):
reason: str = "Заблокирован администратором"
# Загрузка пользователей
def load_users():
users_file = Path("users.json")
if not users_file.exists():
return {}
with open(users_file, "r", encoding="utf-8") as f:
return json.load(f)
# Сохранение пользователей
def save_users(users):
with open("users.json", "w", encoding="utf-8") as f:
json.dump(users, f, indent=2, ensure_ascii=False)
# Проверка прав
def require_owner(current_user: dict):
if current_user.get("role") != "owner":
raise HTTPException(status_code=403, detail="Требуется роль владельца")
def require_admin_or_owner(current_user: dict):
if current_user.get("role") not in ["owner", "admin"]:
raise HTTPException(status_code=403, detail="Требуется роль администратора или владельца")
# 1. Получить список пользователей
@router.get("/api/users")
async def get_users(current_user: dict = Depends()):
require_admin_or_owner(current_user)
users = load_users()
# Возвращаем список пользователей (без паролей)
users_list = []
for username, user_data in users.items():
user_copy = user_data.copy()
user_copy.pop("password", None)
users_list.append(user_copy)
return users_list
# 2. Изменить роль пользователя
@router.put("/api/users/{username}/role")
async def change_user_role(username: str, role_data: RoleChange, current_user: dict = Depends()):
require_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if username == current_user.get("username"):
raise HTTPException(status_code=400, detail="Нельзя изменить свою роль")
# Проверка валидности роли
valid_roles = ["owner", "admin", "support", "user", "banned"]
if role_data.role not in valid_roles:
raise HTTPException(status_code=400, detail=f"Неверная роль. Доступные: {', '.join(valid_roles)}")
# Если назначается новый owner, текущий owner становится admin
if role_data.role == "owner":
for user in users.values():
if user.get("role") == "owner":
user["role"] = "admin"
# Изменяем роль
old_role = users[username].get("role", "user")
users[username]["role"] = role_data.role
# Обновляем права в зависимости от роли
if role_data.role == "owner":
users[username]["permissions"] = {
"manage_users": True,
"manage_roles": True,
"manage_servers": True,
"manage_tickets": True,
"manage_files": True,
"delete_users": True,
"view_all_resources": True
}
elif role_data.role == "admin":
users[username]["permissions"] = {
"manage_users": True,
"manage_roles": False,
"manage_servers": True,
"manage_tickets": True,
"manage_files": True,
"delete_users": False,
"view_all_resources": True
}
elif role_data.role == "support":
users[username]["permissions"] = {
"manage_users": False,
"manage_roles": False,
"manage_servers": False,
"manage_tickets": True,
"manage_files": False,
"delete_users": False,
"view_all_resources": False
}
elif role_data.role == "banned":
users[username]["permissions"] = {
"manage_users": False,
"manage_roles": False,
"manage_servers": False,
"manage_tickets": False,
"manage_files": False,
"delete_users": False,
"view_all_resources": False
}
else: # user
users[username]["permissions"] = {
"manage_users": False,
"manage_roles": False,
"manage_servers": True,
"manage_tickets": True,
"manage_files": True,
"delete_users": False,
"view_all_resources": False
}
save_users(users)
return {
"message": f"Роль пользователя {username} изменена с {old_role} на {role_data.role}",
"user": {
"username": username,
"role": role_data.role
}
}
# 3. Изменить права пользователя
@router.put("/api/users/{username}/permissions")
async def update_user_permissions(username: str, perms: PermissionsUpdate, current_user: dict = Depends()):
require_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
users[username]["permissions"] = perms.permissions
save_users(users)
return {
"message": f"Права пользователя {username} обновлены",
"permissions": perms.permissions
}
# 4. Выдать доступ к серверу
@router.post("/api/users/{username}/access/servers")
async def grant_server_access(username: str, access: ServerAccess, current_user: dict = Depends()):
require_admin_or_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if "resource_access" not in users[username]:
users[username]["resource_access"] = {"servers": [], "tickets": [], "files": []}
if access.server_name not in users[username]["resource_access"]["servers"]:
users[username]["resource_access"]["servers"].append(access.server_name)
# Также добавляем в старое поле servers для совместимости
if "servers" not in users[username]:
users[username]["servers"] = []
if access.server_name not in users[username]["servers"]:
users[username]["servers"].append(access.server_name)
save_users(users)
return {
"message": f"Доступ к серверу {access.server_name} выдан пользователю {username}",
"server": access.server_name,
"user": username
}
# 5. Забрать доступ к серверу
@router.delete("/api/users/{username}/access/servers/{server_name}")
async def revoke_server_access(username: str, server_name: str, current_user: dict = Depends()):
require_admin_or_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if "resource_access" in users[username] and "servers" in users[username]["resource_access"]:
if server_name in users[username]["resource_access"]["servers"]:
users[username]["resource_access"]["servers"].remove(server_name)
# Также удаляем из старого поля servers
if "servers" in users[username] and server_name in users[username]["servers"]:
users[username]["servers"].remove(server_name)
save_users(users)
return {
"message": f"Доступ к серверу {server_name} отозван у пользователя {username}",
"server": server_name,
"user": username
}
# 6. Удалить пользователя
@router.delete("/api/users/{username}")
async def delete_user(username: str, current_user: dict = Depends()):
require_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if username == current_user.get("username"):
raise HTTPException(status_code=400, detail="Нельзя удалить самого себя")
if users[username].get("role") == "owner":
raise HTTPException(status_code=400, detail="Нельзя удалить владельца")
del users[username]
save_users(users)
return {
"message": f"Пользователь {username} удалён",
"username": username
}
# 7. Заблокировать пользователя
@router.post("/api/users/{username}/ban")
async def ban_user(username: str, ban_data: BanRequest, current_user: dict = Depends()):
require_admin_or_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if username == current_user.get("username"):
raise HTTPException(status_code=400, detail="Нельзя заблокировать самого себя")
if users[username].get("role") == "owner":
raise HTTPException(status_code=400, detail="Нельзя заблокировать владельца")
users[username]["role"] = "banned"
users[username]["permissions"] = {
"manage_users": False,
"manage_roles": False,
"manage_servers": False,
"manage_tickets": False,
"manage_files": False,
"delete_users": False,
"view_all_resources": False
}
users[username]["ban_reason"] = ban_data.reason
save_users(users)
return {
"message": f"Пользователь {username} заблокирован",
"username": username,
"reason": ban_data.reason
}
# 8. Разблокировать пользователя
@router.post("/api/users/{username}/unban")
async def unban_user(username: str, current_user: dict = Depends()):
require_admin_or_owner(current_user)
users = load_users()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if users[username].get("role") != "banned":
raise HTTPException(status_code=400, detail="Пользователь не заблокирован")
users[username]["role"] = "user"
users[username]["permissions"] = {
"manage_users": False,
"manage_roles": False,
"manage_servers": True,
"manage_tickets": True,
"manage_files": True,
"delete_users": False,
"view_all_resources": False
}
users[username].pop("ban_reason", None)
save_users(users)
return {
"message": f"Пользователь {username} разблокирован",
"username": username
}