321 lines
11 KiB
Python
321 lines
11 KiB
Python
"""
|
||
API эндпоинты для управления пользователями (v1.1.0)
|
||
Требуется роль owner или admin
|
||
"""
|
||
|
||
from fastapi import APIRouter, HTTPException, Depends
|
||
from pydantic import BaseModel
|
||
from typing import Optional, List
|
||
import json
|
||
from pathlib import Path
|
||
|
||
router = APIRouter()
|
||
|
||
# Модели данных
|
||
class RoleChange(BaseModel):
|
||
role: str
|
||
|
||
class PermissionsUpdate(BaseModel):
|
||
permissions: dict
|
||
|
||
class ServerAccess(BaseModel):
|
||
server_name: str
|
||
|
||
class BanRequest(BaseModel):
|
||
reason: str = "Заблокирован администратором"
|
||
|
||
# Загрузка пользователей
|
||
def load_users():
|
||
users_file = Path("users.json")
|
||
if not users_file.exists():
|
||
return {}
|
||
|
||
with open(users_file, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
# Сохранение пользователей
|
||
def save_users(users):
|
||
with open("users.json", "w", encoding="utf-8") as f:
|
||
json.dump(users, f, indent=2, ensure_ascii=False)
|
||
|
||
# Проверка прав
|
||
def require_owner(current_user: dict):
|
||
if current_user.get("role") != "owner":
|
||
raise HTTPException(status_code=403, detail="Требуется роль владельца")
|
||
|
||
def require_admin_or_owner(current_user: dict):
|
||
if current_user.get("role") not in ["owner", "admin"]:
|
||
raise HTTPException(status_code=403, detail="Требуется роль администратора или владельца")
|
||
|
||
# 1. Получить список пользователей
|
||
@router.get("/api/users")
|
||
async def get_users(current_user: dict = Depends()):
|
||
require_admin_or_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
# Возвращаем список пользователей (без паролей)
|
||
users_list = []
|
||
for username, user_data in users.items():
|
||
user_copy = user_data.copy()
|
||
user_copy.pop("password", None)
|
||
users_list.append(user_copy)
|
||
|
||
return users_list
|
||
|
||
# 2. Изменить роль пользователя
|
||
@router.put("/api/users/{username}/role")
|
||
async def change_user_role(username: str, role_data: RoleChange, current_user: dict = Depends()):
|
||
require_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
if username == current_user.get("username"):
|
||
raise HTTPException(status_code=400, detail="Нельзя изменить свою роль")
|
||
|
||
# Проверка валидности роли
|
||
valid_roles = ["owner", "admin", "support", "user", "banned"]
|
||
if role_data.role not in valid_roles:
|
||
raise HTTPException(status_code=400, detail=f"Неверная роль. Доступные: {', '.join(valid_roles)}")
|
||
|
||
# Если назначается новый owner, текущий owner становится admin
|
||
if role_data.role == "owner":
|
||
for user in users.values():
|
||
if user.get("role") == "owner":
|
||
user["role"] = "admin"
|
||
|
||
# Изменяем роль
|
||
old_role = users[username].get("role", "user")
|
||
users[username]["role"] = role_data.role
|
||
|
||
# Обновляем права в зависимости от роли
|
||
if role_data.role == "owner":
|
||
users[username]["permissions"] = {
|
||
"manage_users": True,
|
||
"manage_roles": True,
|
||
"manage_servers": True,
|
||
"manage_tickets": True,
|
||
"manage_files": True,
|
||
"delete_users": True,
|
||
"view_all_resources": True
|
||
}
|
||
elif role_data.role == "admin":
|
||
users[username]["permissions"] = {
|
||
"manage_users": True,
|
||
"manage_roles": False,
|
||
"manage_servers": True,
|
||
"manage_tickets": True,
|
||
"manage_files": True,
|
||
"delete_users": False,
|
||
"view_all_resources": True
|
||
}
|
||
elif role_data.role == "support":
|
||
users[username]["permissions"] = {
|
||
"manage_users": False,
|
||
"manage_roles": False,
|
||
"manage_servers": False,
|
||
"manage_tickets": True,
|
||
"manage_files": False,
|
||
"delete_users": False,
|
||
"view_all_resources": False
|
||
}
|
||
elif role_data.role == "banned":
|
||
users[username]["permissions"] = {
|
||
"manage_users": False,
|
||
"manage_roles": False,
|
||
"manage_servers": False,
|
||
"manage_tickets": False,
|
||
"manage_files": False,
|
||
"delete_users": False,
|
||
"view_all_resources": False
|
||
}
|
||
else: # user
|
||
users[username]["permissions"] = {
|
||
"manage_users": False,
|
||
"manage_roles": False,
|
||
"manage_servers": True,
|
||
"manage_tickets": True,
|
||
"manage_files": True,
|
||
"delete_users": False,
|
||
"view_all_resources": False
|
||
}
|
||
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Роль пользователя {username} изменена с {old_role} на {role_data.role}",
|
||
"user": {
|
||
"username": username,
|
||
"role": role_data.role
|
||
}
|
||
}
|
||
|
||
# 3. Изменить права пользователя
|
||
@router.put("/api/users/{username}/permissions")
|
||
async def update_user_permissions(username: str, perms: PermissionsUpdate, current_user: dict = Depends()):
|
||
require_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
users[username]["permissions"] = perms.permissions
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Права пользователя {username} обновлены",
|
||
"permissions": perms.permissions
|
||
}
|
||
|
||
# 4. Выдать доступ к серверу
|
||
@router.post("/api/users/{username}/access/servers")
|
||
async def grant_server_access(username: str, access: ServerAccess, current_user: dict = Depends()):
|
||
require_admin_or_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
if "resource_access" not in users[username]:
|
||
users[username]["resource_access"] = {"servers": [], "tickets": [], "files": []}
|
||
|
||
if access.server_name not in users[username]["resource_access"]["servers"]:
|
||
users[username]["resource_access"]["servers"].append(access.server_name)
|
||
|
||
# Также добавляем в старое поле servers для совместимости
|
||
if "servers" not in users[username]:
|
||
users[username]["servers"] = []
|
||
if access.server_name not in users[username]["servers"]:
|
||
users[username]["servers"].append(access.server_name)
|
||
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Доступ к серверу {access.server_name} выдан пользователю {username}",
|
||
"server": access.server_name,
|
||
"user": username
|
||
}
|
||
|
||
# 5. Забрать доступ к серверу
|
||
@router.delete("/api/users/{username}/access/servers/{server_name}")
|
||
async def revoke_server_access(username: str, server_name: str, current_user: dict = Depends()):
|
||
require_admin_or_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
if "resource_access" in users[username] and "servers" in users[username]["resource_access"]:
|
||
if server_name in users[username]["resource_access"]["servers"]:
|
||
users[username]["resource_access"]["servers"].remove(server_name)
|
||
|
||
# Также удаляем из старого поля servers
|
||
if "servers" in users[username] and server_name in users[username]["servers"]:
|
||
users[username]["servers"].remove(server_name)
|
||
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Доступ к серверу {server_name} отозван у пользователя {username}",
|
||
"server": server_name,
|
||
"user": username
|
||
}
|
||
|
||
# 6. Удалить пользователя
|
||
@router.delete("/api/users/{username}")
|
||
async def delete_user(username: str, current_user: dict = Depends()):
|
||
require_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
if username == current_user.get("username"):
|
||
raise HTTPException(status_code=400, detail="Нельзя удалить самого себя")
|
||
|
||
if users[username].get("role") == "owner":
|
||
raise HTTPException(status_code=400, detail="Нельзя удалить владельца")
|
||
|
||
del users[username]
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Пользователь {username} удалён",
|
||
"username": username
|
||
}
|
||
|
||
# 7. Заблокировать пользователя
|
||
@router.post("/api/users/{username}/ban")
|
||
async def ban_user(username: str, ban_data: BanRequest, current_user: dict = Depends()):
|
||
require_admin_or_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
if username == current_user.get("username"):
|
||
raise HTTPException(status_code=400, detail="Нельзя заблокировать самого себя")
|
||
|
||
if users[username].get("role") == "owner":
|
||
raise HTTPException(status_code=400, detail="Нельзя заблокировать владельца")
|
||
|
||
users[username]["role"] = "banned"
|
||
users[username]["permissions"] = {
|
||
"manage_users": False,
|
||
"manage_roles": False,
|
||
"manage_servers": False,
|
||
"manage_tickets": False,
|
||
"manage_files": False,
|
||
"delete_users": False,
|
||
"view_all_resources": False
|
||
}
|
||
users[username]["ban_reason"] = ban_data.reason
|
||
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Пользователь {username} заблокирован",
|
||
"username": username,
|
||
"reason": ban_data.reason
|
||
}
|
||
|
||
# 8. Разблокировать пользователя
|
||
@router.post("/api/users/{username}/unban")
|
||
async def unban_user(username: str, current_user: dict = Depends()):
|
||
require_admin_or_owner(current_user)
|
||
|
||
users = load_users()
|
||
|
||
if username not in users:
|
||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||
|
||
if users[username].get("role") != "banned":
|
||
raise HTTPException(status_code=400, detail="Пользователь не заблокирован")
|
||
|
||
users[username]["role"] = "user"
|
||
users[username]["permissions"] = {
|
||
"manage_users": False,
|
||
"manage_roles": False,
|
||
"manage_servers": True,
|
||
"manage_tickets": True,
|
||
"manage_files": True,
|
||
"delete_users": False,
|
||
"view_all_resources": False
|
||
}
|
||
users[username].pop("ban_reason", None)
|
||
|
||
save_users(users)
|
||
|
||
return {
|
||
"message": f"Пользователь {username} разблокирован",
|
||
"username": username
|
||
}
|