Added Role Owner and new UI for Owner
This commit is contained in:
320
backend/user_management_endpoints.py
Normal file
320
backend/user_management_endpoints.py
Normal file
@@ -0,0 +1,320 @@
|
||||
"""
|
||||
API эндпоинты для управления пользователями (v1.1.0)
|
||||
Требуется роль owner или admin
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Depends
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional, List
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# Модели данных
|
||||
class RoleChange(BaseModel):
|
||||
role: str
|
||||
|
||||
class PermissionsUpdate(BaseModel):
|
||||
permissions: dict
|
||||
|
||||
class ServerAccess(BaseModel):
|
||||
server_name: str
|
||||
|
||||
class BanRequest(BaseModel):
|
||||
reason: str = "Заблокирован администратором"
|
||||
|
||||
# Загрузка пользователей
|
||||
def load_users():
|
||||
users_file = Path("users.json")
|
||||
if not users_file.exists():
|
||||
return {}
|
||||
|
||||
with open(users_file, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
# Сохранение пользователей
|
||||
def save_users(users):
|
||||
with open("users.json", "w", encoding="utf-8") as f:
|
||||
json.dump(users, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Проверка прав
|
||||
def require_owner(current_user: dict):
|
||||
if current_user.get("role") != "owner":
|
||||
raise HTTPException(status_code=403, detail="Требуется роль владельца")
|
||||
|
||||
def require_admin_or_owner(current_user: dict):
|
||||
if current_user.get("role") not in ["owner", "admin"]:
|
||||
raise HTTPException(status_code=403, detail="Требуется роль администратора или владельца")
|
||||
|
||||
# 1. Получить список пользователей
|
||||
@router.get("/api/users")
|
||||
async def get_users(current_user: dict = Depends()):
|
||||
require_admin_or_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
# Возвращаем список пользователей (без паролей)
|
||||
users_list = []
|
||||
for username, user_data in users.items():
|
||||
user_copy = user_data.copy()
|
||||
user_copy.pop("password", None)
|
||||
users_list.append(user_copy)
|
||||
|
||||
return users_list
|
||||
|
||||
# 2. Изменить роль пользователя
|
||||
@router.put("/api/users/{username}/role")
|
||||
async def change_user_role(username: str, role_data: RoleChange, current_user: dict = Depends()):
|
||||
require_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
if username == current_user.get("username"):
|
||||
raise HTTPException(status_code=400, detail="Нельзя изменить свою роль")
|
||||
|
||||
# Проверка валидности роли
|
||||
valid_roles = ["owner", "admin", "support", "user", "banned"]
|
||||
if role_data.role not in valid_roles:
|
||||
raise HTTPException(status_code=400, detail=f"Неверная роль. Доступные: {', '.join(valid_roles)}")
|
||||
|
||||
# Если назначается новый owner, текущий owner становится admin
|
||||
if role_data.role == "owner":
|
||||
for user in users.values():
|
||||
if user.get("role") == "owner":
|
||||
user["role"] = "admin"
|
||||
|
||||
# Изменяем роль
|
||||
old_role = users[username].get("role", "user")
|
||||
users[username]["role"] = role_data.role
|
||||
|
||||
# Обновляем права в зависимости от роли
|
||||
if role_data.role == "owner":
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": True,
|
||||
"manage_roles": True,
|
||||
"manage_servers": True,
|
||||
"manage_tickets": True,
|
||||
"manage_files": True,
|
||||
"delete_users": True,
|
||||
"view_all_resources": True
|
||||
}
|
||||
elif role_data.role == "admin":
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": True,
|
||||
"manage_roles": False,
|
||||
"manage_servers": True,
|
||||
"manage_tickets": True,
|
||||
"manage_files": True,
|
||||
"delete_users": False,
|
||||
"view_all_resources": True
|
||||
}
|
||||
elif role_data.role == "support":
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": False,
|
||||
"manage_roles": False,
|
||||
"manage_servers": False,
|
||||
"manage_tickets": True,
|
||||
"manage_files": False,
|
||||
"delete_users": False,
|
||||
"view_all_resources": False
|
||||
}
|
||||
elif role_data.role == "banned":
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": False,
|
||||
"manage_roles": False,
|
||||
"manage_servers": False,
|
||||
"manage_tickets": False,
|
||||
"manage_files": False,
|
||||
"delete_users": False,
|
||||
"view_all_resources": False
|
||||
}
|
||||
else: # user
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": False,
|
||||
"manage_roles": False,
|
||||
"manage_servers": True,
|
||||
"manage_tickets": True,
|
||||
"manage_files": True,
|
||||
"delete_users": False,
|
||||
"view_all_resources": False
|
||||
}
|
||||
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Роль пользователя {username} изменена с {old_role} на {role_data.role}",
|
||||
"user": {
|
||||
"username": username,
|
||||
"role": role_data.role
|
||||
}
|
||||
}
|
||||
|
||||
# 3. Изменить права пользователя
|
||||
@router.put("/api/users/{username}/permissions")
|
||||
async def update_user_permissions(username: str, perms: PermissionsUpdate, current_user: dict = Depends()):
|
||||
require_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
users[username]["permissions"] = perms.permissions
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Права пользователя {username} обновлены",
|
||||
"permissions": perms.permissions
|
||||
}
|
||||
|
||||
# 4. Выдать доступ к серверу
|
||||
@router.post("/api/users/{username}/access/servers")
|
||||
async def grant_server_access(username: str, access: ServerAccess, current_user: dict = Depends()):
|
||||
require_admin_or_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
if "resource_access" not in users[username]:
|
||||
users[username]["resource_access"] = {"servers": [], "tickets": [], "files": []}
|
||||
|
||||
if access.server_name not in users[username]["resource_access"]["servers"]:
|
||||
users[username]["resource_access"]["servers"].append(access.server_name)
|
||||
|
||||
# Также добавляем в старое поле servers для совместимости
|
||||
if "servers" not in users[username]:
|
||||
users[username]["servers"] = []
|
||||
if access.server_name not in users[username]["servers"]:
|
||||
users[username]["servers"].append(access.server_name)
|
||||
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Доступ к серверу {access.server_name} выдан пользователю {username}",
|
||||
"server": access.server_name,
|
||||
"user": username
|
||||
}
|
||||
|
||||
# 5. Забрать доступ к серверу
|
||||
@router.delete("/api/users/{username}/access/servers/{server_name}")
|
||||
async def revoke_server_access(username: str, server_name: str, current_user: dict = Depends()):
|
||||
require_admin_or_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
if "resource_access" in users[username] and "servers" in users[username]["resource_access"]:
|
||||
if server_name in users[username]["resource_access"]["servers"]:
|
||||
users[username]["resource_access"]["servers"].remove(server_name)
|
||||
|
||||
# Также удаляем из старого поля servers
|
||||
if "servers" in users[username] and server_name in users[username]["servers"]:
|
||||
users[username]["servers"].remove(server_name)
|
||||
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Доступ к серверу {server_name} отозван у пользователя {username}",
|
||||
"server": server_name,
|
||||
"user": username
|
||||
}
|
||||
|
||||
# 6. Удалить пользователя
|
||||
@router.delete("/api/users/{username}")
|
||||
async def delete_user(username: str, current_user: dict = Depends()):
|
||||
require_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
if username == current_user.get("username"):
|
||||
raise HTTPException(status_code=400, detail="Нельзя удалить самого себя")
|
||||
|
||||
if users[username].get("role") == "owner":
|
||||
raise HTTPException(status_code=400, detail="Нельзя удалить владельца")
|
||||
|
||||
del users[username]
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Пользователь {username} удалён",
|
||||
"username": username
|
||||
}
|
||||
|
||||
# 7. Заблокировать пользователя
|
||||
@router.post("/api/users/{username}/ban")
|
||||
async def ban_user(username: str, ban_data: BanRequest, current_user: dict = Depends()):
|
||||
require_admin_or_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
if username == current_user.get("username"):
|
||||
raise HTTPException(status_code=400, detail="Нельзя заблокировать самого себя")
|
||||
|
||||
if users[username].get("role") == "owner":
|
||||
raise HTTPException(status_code=400, detail="Нельзя заблокировать владельца")
|
||||
|
||||
users[username]["role"] = "banned"
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": False,
|
||||
"manage_roles": False,
|
||||
"manage_servers": False,
|
||||
"manage_tickets": False,
|
||||
"manage_files": False,
|
||||
"delete_users": False,
|
||||
"view_all_resources": False
|
||||
}
|
||||
users[username]["ban_reason"] = ban_data.reason
|
||||
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Пользователь {username} заблокирован",
|
||||
"username": username,
|
||||
"reason": ban_data.reason
|
||||
}
|
||||
|
||||
# 8. Разблокировать пользователя
|
||||
@router.post("/api/users/{username}/unban")
|
||||
async def unban_user(username: str, current_user: dict = Depends()):
|
||||
require_admin_or_owner(current_user)
|
||||
|
||||
users = load_users()
|
||||
|
||||
if username not in users:
|
||||
raise HTTPException(status_code=404, detail="Пользователь не найден")
|
||||
|
||||
if users[username].get("role") != "banned":
|
||||
raise HTTPException(status_code=400, detail="Пользователь не заблокирован")
|
||||
|
||||
users[username]["role"] = "user"
|
||||
users[username]["permissions"] = {
|
||||
"manage_users": False,
|
||||
"manage_roles": False,
|
||||
"manage_servers": True,
|
||||
"manage_tickets": True,
|
||||
"manage_files": True,
|
||||
"delete_users": False,
|
||||
"view_all_resources": False
|
||||
}
|
||||
users[username].pop("ban_reason", None)
|
||||
|
||||
save_users(users)
|
||||
|
||||
return {
|
||||
"message": f"Пользователь {username} разблокирован",
|
||||
"username": username
|
||||
}
|
||||
Reference in New Issue
Block a user