Files
NeveTimePanel/backend/main.py
arkonsadter d188cec1f0
All checks were successful
continuous-integration/drone/push Build is passing
Added Daemon system and fixed interface
2026-01-16 18:56:21 +06:00

1940 lines
77 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, WebSocket, UploadFile, File, HTTPException, Depends, status, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import asyncio
import subprocess
import psutil
import os
import shutil
import sys
from pathlib import Path
from typing import Optional
import json
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from authlib.integrations.starlette_client import OAuth
from authlib.common.errors import AuthlibBaseError
import httpx
from dotenv import load_dotenv
from oidc_config import get_enabled_providers, get_redirect_uri, OIDC_PROVIDERS
# Загружаем переменные окружения
load_dotenv()
app = FastAPI(title="MC Panel")
# Инициализация OAuth
oauth = OAuth()
# Регистрация ZITADEL провайдера
enabled_providers = get_enabled_providers()
if "zitadel" in enabled_providers:
config = enabled_providers["zitadel"]
oauth.register(
name="zitadel",
client_id=config["client_id"],
client_secret=config["client_secret"],
server_metadata_url=config["server_metadata_url"],
client_kwargs={"scope": " ".join(config["scopes"])}
)
print(f"✓ ZITADEL провайдер зарегистрирован: {config['issuer']}")
else:
print("⚠ ZITADEL провайдер не настроен. Проверьте .env файл.")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Настройки безопасности
SECRET_KEY = "your-secret-key-change-this-in-production-12345"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 дней
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False)
SERVERS_DIR = Path("servers")
SERVERS_DIR.mkdir(exist_ok=True)
USERS_FILE = Path("users.json")
TICKETS_FILE = Path("tickets.json")
server_processes: dict[str, subprocess.Popen] = {}
server_logs: dict[str, list[str]] = {}
IS_WINDOWS = sys.platform == 'win32'
# Инициализация файла пользователей
def init_users():
if not USERS_FILE.exists():
admin_user = {
"username": "Root",
"password": pwd_context.hash("Admin"),
"role": "admin",
"servers": []
}
save_users({"Sofa12345": admin_user})
print("Создан пользователь по умолчанию: none / none")
def load_users() -> dict:
if USERS_FILE.exists():
with open(USERS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_users(users: dict):
with open(USERS_FILE, 'w', encoding='utf-8') as f:
json.dump(users, f, indent=2, ensure_ascii=False)
def load_server_config(server_name: str) -> dict:
config_path = SERVERS_DIR / server_name / "panel_config.json"
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {
"name": server_name,
"displayName": server_name,
"startCommand": "java -Xmx2G -Xms1G -jar server.jar nogui"
}
def save_server_config(server_name: str, config: dict):
config_path = SERVERS_DIR / server_name / "panel_config.json"
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# Функции для работы с тикетами
def load_tickets() -> dict:
if TICKETS_FILE.exists():
with open(TICKETS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_tickets(tickets: dict):
with open(TICKETS_FILE, 'w', encoding='utf-8') as f:
json.dump(tickets, f, indent=2, ensure_ascii=False)
init_users()
# Функции аутентификации
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not credentials:
raise HTTPException(status_code=401, detail="Требуется авторизация")
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Неверный токен")
users = load_users()
if username not in users:
raise HTTPException(status_code=401, detail="Пользователь не найден")
user = users[username]
# Проверка на бан
if user.get("role") == "banned":
raise HTTPException(status_code=403, detail="Ваш аккаунт заблокирован")
return user
except JWTError:
raise HTTPException(status_code=401, detail="Неверный токен")
def check_server_access(user: dict, server_name: str):
# Владелец имеет доступ ко всем серверам
if user["role"] == "owner":
return True
# Админы имеют доступ ко всем серверам
if user["role"] == "admin":
return True
# Проверяем права на серверы
if not user.get("permissions", {}).get("servers", True):
return False
return server_name in user.get("servers", [])
# API для аутентификации
# OpenID Connect endpoints
@app.get("/api/auth/oidc/providers")
async def get_oidc_providers():
"""Получить список доступных OpenID Connect провайдеров"""
providers = {}
for provider_id, config in get_enabled_providers().items():
providers[provider_id] = {
"name": config["name"],
"icon": config["icon"],
"color": config["color"]
}
return providers
@app.get("/api/auth/oidc/{provider}/login")
async def oidc_login(provider: str, request: Request):
"""Начать процесс аутентификации через OpenID Connect"""
if provider not in get_enabled_providers():
raise HTTPException(404, f"Провайдер {provider} не найден или не настроен")
try:
redirect_uri = get_redirect_uri(provider, os.getenv("BASE_URL", "http://localhost:8000"))
return await oauth.create_client(provider).authorize_redirect(request, redirect_uri)
except Exception as e:
raise HTTPException(500, f"Ошибка инициализации OAuth: {str(e)}")
@app.get("/api/auth/oidc/{provider}/callback")
async def oidc_callback(provider: str, request: Request):
"""Обработка callback от OpenID Connect провайдера"""
if provider not in get_enabled_providers():
raise HTTPException(404, f"Провайдер {provider} не найден или не настроен")
try:
client = oauth.create_client(provider)
# Получаем токен от провайдера
token = await client.authorize_access_token(request)
# Получаем данные пользователя
user_data = token.get("userinfo")
if not user_data:
# Если userinfo нет в токене, парсим id_token
user_data = token.get("id_token")
if not user_data:
raise HTTPException(400, "Не удалось получить данные пользователя")
# Создаём или обновляем пользователя
username = create_or_update_oidc_user(user_data, provider)
# Создаём JWT токен для нашей системы
users = load_users()
user = users[username]
access_token = create_access_token({"sub": username, "role": user["role"]})
# Перенаправляем на фронтенд с токеном
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000")
return RedirectResponse(f"{frontend_url}/?token={access_token}&username={username}")
except AuthlibBaseError as e:
print(f"OAuth ошибка для {provider}: {str(e)}")
raise HTTPException(400, f"OAuth ошибка: {str(e)}")
except Exception as e:
print(f"Ошибка аутентификации для {provider}: {str(e)}")
raise HTTPException(500, f"Ошибка аутентификации: {str(e)}")
def create_or_update_oidc_user(user_data: dict, provider: str) -> str:
"""Создать или обновить пользователя из OpenID Connect данных"""
users = load_users()
# Генерируем уникальное имя пользователя
email = user_data.get("email", "")
name = user_data.get("name", "")
sub = user_data.get("sub", "")
# Пытаемся использовать email как username
if email:
base_username = email.split("@")[0]
elif name:
base_username = name.replace(" ", "_").lower()
else:
base_username = f"{provider}_user"
# Убираем недопустимые символы
import re
base_username = re.sub(r'[^a-zA-Z0-9_-]', '', base_username)
# Ищем существующего пользователя по OIDC ID
oidc_id = f"{provider}:{sub}"
existing_user = None
for username, user_info in users.items():
if user_info.get("oidc_id") == oidc_id:
existing_user = username
break
if existing_user:
# Обновляем существующего пользователя
users[existing_user]["email"] = email
users[existing_user]["name"] = name
users[existing_user]["picture"] = user_data.get("picture")
save_users(users)
return existing_user
else:
# Создаём нового пользователя
username = base_username
counter = 1
while username in users:
username = f"{base_username}_{counter}"
counter += 1
users[username] = {
"username": username,
"password": "", # Пустой пароль для OIDC пользователей
"role": "user",
"servers": [],
"oidc_id": oidc_id,
"email": email,
"name": name,
"picture": user_data.get("picture"),
"provider": provider,
"created_at": datetime.utcnow().isoformat()
}
save_users(users)
return username
@app.post("/api/auth/register")
async def register(data: dict):
users = load_users()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
if not username or not password:
raise HTTPException(400, "Имя пользователя и пароль обязательны")
if username in users:
raise HTTPException(400, "Пользователь уже существует")
# Первый пользователь становится владельцем
role = "owner" if len(users) == 0 else "user"
users[username] = {
"username": username,
"password": get_password_hash(password),
"role": role,
"servers": [],
"permissions": {
"servers": True,
"tickets": True,
"users": True if role == "owner" else False,
"files": True
} if role == "owner" else {
"servers": True,
"tickets": True,
"users": False,
"files": True
}
}
save_users(users)
access_token = create_access_token(data={"sub": username, "role": role})
return {
"access_token": access_token,
"token_type": "bearer",
"username": username,
"role": role
}
@app.post("/api/auth/login")
async def login(data: dict):
users = load_users()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
if username not in users:
raise HTTPException(401, "Неверное имя пользователя или пароль")
user = users[username]
if not verify_password(password, user["password"]):
raise HTTPException(401, "Неверное имя пользователя или пароль")
access_token = create_access_token(data={"sub": username, "role": user["role"]})
return {
"access_token": access_token,
"token_type": "bearer",
"username": username,
"role": user["role"]
}
@app.get("/api/auth/me")
async def get_me(user: dict = Depends(get_current_user)):
users = load_users()
user_data = users.get(user["username"], {})
# Если у пользователя нет прав, создаем дефолтные
if "permissions" not in user_data:
user_data["permissions"] = {
"servers": True,
"tickets": True,
"users": user_data["role"] in ["owner", "admin"],
"files": True
}
users[user["username"]] = user_data
save_users(users)
return {
"username": user["username"],
"role": user["role"],
"servers": user.get("servers", []),
"permissions": user_data.get("permissions", {})
}
# API для управления пользователями
@app.get("/api/users")
async def get_users(user: dict = Depends(get_current_user)):
# Владелец, админы и тех. поддержка видят всех пользователей
users = load_users()
return [
{
"username": u["username"],
"role": u["role"],
"servers": u.get("servers", []),
"permissions": u.get("permissions", {
"servers": True,
"tickets": True,
"users": u["role"] in ["owner", "admin"],
"files": True
})
}
for u in users.values()
]
@app.put("/api/users/{username}/servers")
async def update_user_servers(username: str, data: dict, user: dict = Depends(get_current_user)):
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
# Админы могут управлять доступом к любым серверам
if user["role"] == "admin":
users[username]["servers"] = data.get("servers", [])
save_users(users)
return {"message": "Доступ обновлен"}
# Обычные пользователи могут управлять доступом только к своим серверам
requested_servers = data.get("servers", [])
current_servers = users[username].get("servers", [])
# Проверяем, что пользователь пытается изменить доступ только к своим серверам
for server_name in requested_servers:
if server_name not in current_servers:
# Проверяем, является ли текущий пользователь владельцем этого сервера
config = load_server_config(server_name)
if config.get("owner") != user["username"]:
raise HTTPException(403, f"Вы не можете выдать доступ к серверу {server_name}")
users[username]["servers"] = requested_servers
save_users(users)
return {"message": "Доступ обновлен"}
@app.delete("/api/users/{username}")
async def delete_user(username: str, user: dict = Depends(get_current_user)):
# Только владелец может удалять пользователей
if user["role"] != "owner":
raise HTTPException(403, "Только владелец может удалять пользователей")
if username == user["username"]:
raise HTTPException(400, "Нельзя удалить самого себя")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
# Проверяем, что не удаляем последнего владельца
if users[username]["role"] == "owner":
owners_count = sum(1 for u in users.values() if u.get("role") == "owner")
if owners_count <= 1:
raise HTTPException(400, "Нельзя удалить последнего владельца")
del users[username]
save_users(users)
return {"message": "Пользователь удален"}
@app.put("/api/users/{username}/role")
async def update_user_role(username: str, data: dict, user: dict = Depends(get_current_user)):
# Только владелец может изменять роли
if user["role"] != "owner":
raise HTTPException(403, "Только владелец может изменять роли")
if username == user["username"]:
raise HTTPException(400, "Нельзя изменить свою роль")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
new_role = data.get("role")
if new_role not in ["admin", "user", "support", "banned"]:
raise HTTPException(400, "Неверная роль")
# Нельзя назначить роль owner
if new_role == "owner":
raise HTTPException(400, "Нельзя назначить роль владельца")
users[username]["role"] = new_role
save_users(users)
return {"message": "Роль обновлена"}
@app.get("/api/users/{username}/permissions")
async def get_user_permissions(username: str, user: dict = Depends(get_current_user)):
"""Получить права пользователя"""
# Только владелец и админы могут просматривать права
if user["role"] not in ["owner", "admin"]:
raise HTTPException(403, "Недостаточно прав")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
target_user = users[username]
# Если у пользователя нет прав, создаем дефолтные
if "permissions" not in target_user:
target_user["permissions"] = {
"servers": True,
"tickets": True,
"users": target_user["role"] in ["owner", "admin"],
"files": True
}
users[username] = target_user
save_users(users)
return {
"username": username,
"role": target_user["role"],
"permissions": target_user["permissions"]
}
@app.put("/api/users/{username}/permissions")
async def update_user_permissions(username: str, data: dict, user: dict = Depends(get_current_user)):
"""Обновить права пользователя (только для владельца)"""
if user["role"] != "owner":
raise HTTPException(403, "Только владелец может изменять права")
if username == user["username"]:
raise HTTPException(400, "Нельзя изменить свои права")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
target_user = users[username]
# Нельзя изменять права владельца
if target_user["role"] == "owner":
raise HTTPException(400, "Нельзя изменять права владельца")
permissions = data.get("permissions", {})
# Валидация прав
valid_permissions = ["servers", "tickets", "users", "files"]
for perm in permissions:
if perm not in valid_permissions:
raise HTTPException(400, f"Неверное право: {perm}")
# Обновляем права
if "permissions" not in target_user:
target_user["permissions"] = {
"servers": True,
"tickets": True,
"users": False,
"files": True
}
target_user["permissions"].update(permissions)
users[username] = target_user
save_users(users)
return {
"message": "Права обновлены",
"permissions": target_user["permissions"]
}
@app.post("/api/users/{username}/revoke-access")
async def revoke_user_access(username: str, data: dict, user: dict = Depends(get_current_user)):
"""Забрать доступ к определенным ресурсам (только для владельца)"""
if user["role"] != "owner":
raise HTTPException(403, "Только владелец может забирать доступ")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
target_user = users[username]
# Нельзя забирать доступ у владельца
if target_user["role"] == "owner":
raise HTTPException(400, "Нельзя забирать доступ у владельца")
resource_type = data.get("type") # "servers", "tickets", "all"
if resource_type == "servers":
# Забираем доступ ко всем серверам
target_user["servers"] = []
if "permissions" in target_user:
target_user["permissions"]["servers"] = False
elif resource_type == "tickets":
# Забираем доступ к тикетам
if "permissions" in target_user:
target_user["permissions"]["tickets"] = False
elif resource_type == "files":
# Забираем доступ к файлам
if "permissions" in target_user:
target_user["permissions"]["files"] = False
elif resource_type == "all":
# Забираем весь доступ
target_user["servers"] = []
if "permissions" in target_user:
target_user["permissions"] = {
"servers": False,
"tickets": False,
"users": False,
"files": False
}
else:
raise HTTPException(400, "Неверный тип ресурса")
users[username] = target_user
save_users(users)
return {
"message": f"Доступ к {resource_type} забран",
"permissions": target_user.get("permissions", {})
}
@app.post("/api/users/{username}/grant-access")
async def grant_user_access(username: str, data: dict, user: dict = Depends(get_current_user)):
"""Выдать доступ к определенным ресурсам (только для владельца)"""
if user["role"] != "owner":
raise HTTPException(403, "Только владелец может выдавать доступ")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
target_user = users[username]
resource_type = data.get("type") # "servers", "tickets", "files"
if "permissions" not in target_user:
target_user["permissions"] = {
"servers": False,
"tickets": False,
"users": False,
"files": False
}
if resource_type == "servers":
target_user["permissions"]["servers"] = True
elif resource_type == "tickets":
target_user["permissions"]["tickets"] = True
elif resource_type == "files":
target_user["permissions"]["files"] = True
elif resource_type == "all":
target_user["permissions"] = {
"servers": True,
"tickets": True,
"users": target_user["role"] in ["admin"],
"files": True
}
else:
raise HTTPException(400, "Неверный тип ресурса")
users[username] = target_user
save_users(users)
return {
"message": f"Доступ к {resource_type} выдан",
"permissions": target_user["permissions"]
}
# API для личного кабинета
@app.put("/api/profile/username")
async def update_username(data: dict, user: dict = Depends(get_current_user)):
"""Изменить имя пользователя"""
new_username = data.get("new_username", "").strip()
password = data.get("password", "")
if not new_username:
raise HTTPException(400, "Имя пользователя не может быть пустым")
if len(new_username) < 3:
raise HTTPException(400, "Имя пользователя должно быть не менее 3 символов")
users = load_users()
# Проверяем пароль
if not verify_password(password, users[user["username"]]["password"]):
raise HTTPException(400, "Неверный пароль")
# Проверяем, не занято ли новое имя
if new_username in users and new_username != user["username"]:
raise HTTPException(400, "Это имя пользователя уже занято")
# Сохраняем данные пользователя
old_username = user["username"]
user_data = users[old_username]
# Удаляем старую запись и создаём новую
del users[old_username]
user_data["username"] = new_username
users[new_username] = user_data
# Обновляем владельцев серверов
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
config = load_server_config(server_dir.name)
if config.get("owner") == old_username:
config["owner"] = new_username
save_server_config(server_dir.name, config)
# Обновляем доступы к серверам у других пользователей
for username, user_info in users.items():
if "servers" in user_info and old_username in user_info.get("servers", []):
user_info["servers"] = [new_username if s == old_username else s for s in user_info["servers"]]
save_users(users)
# Создаём новый токен
new_token = create_access_token({"sub": new_username, "role": user_data["role"]})
return {
"message": "Имя пользователя изменено",
"access_token": new_token,
"username": new_username
}
@app.put("/api/profile/password")
async def update_password(data: dict, user: dict = Depends(get_current_user)):
"""Изменить пароль"""
old_password = data.get("old_password", "")
new_password = data.get("new_password", "")
if not old_password or not new_password:
raise HTTPException(400, "Заполните все поля")
if len(new_password) < 6:
raise HTTPException(400, "Новый пароль должен быть не менее 6 символов")
users = load_users()
# Проверяем старый пароль
if not verify_password(old_password, users[user["username"]]["password"]):
raise HTTPException(400, "Неверный старый пароль")
# Устанавливаем новый пароль
users[user["username"]]["password"] = get_password_hash(new_password)
save_users(users)
return {"message": "Пароль изменён"}
@app.get("/api/profile/stats")
async def get_profile_stats(user: dict = Depends(get_current_user)):
"""Получить статистику профиля"""
users = load_users()
user_data = users.get(user["username"], {})
# Подсчитываем серверы пользователя
owned_servers = []
accessible_servers = []
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
config = load_server_config(server_dir.name)
if config.get("owner") == user["username"]:
owned_servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name)
})
elif user["username"] in user_data.get("servers", []) or user["role"] == "admin":
accessible_servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name)
})
# Подсчитываем тикеты
tickets = load_tickets()
user_tickets = [t for t in tickets.values() if t["author"] == user["username"]]
tickets_stats = {
"total": len(user_tickets),
"pending": len([t for t in user_tickets if t["status"] == "pending"]),
"in_progress": len([t for t in user_tickets if t["status"] == "in_progress"]),
"closed": len([t for t in user_tickets if t["status"] == "closed"])
}
return {
"username": user["username"],
"role": user["role"],
"owned_servers": owned_servers,
"accessible_servers": accessible_servers,
"tickets": tickets_stats,
"total_servers": len(owned_servers) + len(accessible_servers)
}
@app.get("/api/profile/stats/{username}")
async def get_user_profile_stats(username: str, user: dict = Depends(get_current_user)):
"""Получить статистику профиля другого пользователя (только для админов и тех. поддержки)"""
# Проверка прав доступа
if user["role"] not in ["admin", "support"]:
raise HTTPException(403, "Недостаточно прав для просмотра профилей других пользователей")
users = load_users()
# Проверка существования пользователя
if username not in users:
raise HTTPException(404, "Пользователь не найден")
target_user = users[username]
# Подсчитываем серверы пользователя
owned_servers = []
accessible_servers = []
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
config = load_server_config(server_dir.name)
if config.get("owner") == username:
owned_servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name)
})
elif username in target_user.get("servers", []) or target_user["role"] == "admin":
accessible_servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name)
})
# Подсчитываем тикеты
tickets = load_tickets()
user_tickets = [t for t in tickets.values() if t["author"] == username]
tickets_stats = {
"total": len(user_tickets),
"pending": len([t for t in user_tickets if t["status"] == "pending"]),
"in_progress": len([t for t in user_tickets if t["status"] == "in_progress"]),
"closed": len([t for t in user_tickets if t["status"] == "closed"])
}
return {
"username": username,
"role": target_user["role"],
"owned_servers": owned_servers,
"accessible_servers": accessible_servers,
"tickets": tickets_stats,
"total_servers": len(owned_servers) + len(accessible_servers),
"is_viewing_other": True # Флаг что это чужой профиль
}
# API для серверов
@app.get("/api/servers")
async def get_servers(user: dict = Depends(get_current_user)):
servers = []
try:
# Владелец и администратор видят все серверы
is_admin_or_owner = user.get("role") in ["owner", "admin"]
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
# Проверка доступа: владелец/админ видят всё, остальные только свои
if not is_admin_or_owner and server_dir.name not in user.get("servers", []):
continue
config = load_server_config(server_dir.name)
is_running = False
if server_dir.name in server_processes:
process = server_processes[server_dir.name]
if process.poll() is None:
is_running = True
else:
del server_processes[server_dir.name]
servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name),
"status": "running" if is_running else "stopped",
"owner": config.get("owner", "unknown")
})
print(f"[DEBUG] User: {user['username']} (role: {user.get('role', 'user')})")
print(f"[DEBUG] Is admin/owner: {is_admin_or_owner}")
print(f"[DEBUG] Servers found: {len(servers)}")
except Exception as e:
print(f"Ошибка загрузки серверов: {e}")
return servers
@app.post("/api/servers/create")
async def create_server(data: dict, user: dict = Depends(get_current_user)):
server_name = data.get("name", "").strip()
daemon_id = data.get("daemonId", "local")
if not server_name or not server_name.replace("_", "").replace("-", "").isalnum():
raise HTTPException(400, "Недопустимое имя сервера")
# Если создаем на демоне
if daemon_id != "local":
# Загружаем демоны
from daemons import load_daemons
daemons = load_daemons()
if daemon_id not in daemons:
raise HTTPException(404, "Демон не найден")
daemon = daemons[daemon_id]
# Отправляем запрос на создание сервера на демоне
url = f"http://{daemon['address']}:{daemon['port']}/api/servers/create"
headers = {"Authorization": f"Bearer {daemon['key']}"}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, json={
"name": server_name,
"displayName": data.get("displayName", server_name),
"startCommand": data.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui"),
"owner": user["username"]
}, headers=headers)
if response.status_code != 200:
raise HTTPException(400, f"Ошибка создания сервера на демоне: {response.text}")
except httpx.RequestError as e:
raise HTTPException(400, f"Ошибка подключения к демону: {str(e)}")
# Сохраняем информацию о сервере локально
config = {
"name": server_name,
"displayName": data.get("displayName", server_name),
"startCommand": data.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui"),
"owner": user["username"],
"daemonId": daemon_id,
"daemonName": daemon["name"]
}
# Создаем локальную запись о сервере
server_path = SERVERS_DIR / f"{daemon_id}_{server_name}"
server_path.mkdir(parents=True, exist_ok=True)
save_server_config(f"{daemon_id}_{server_name}", config)
else:
# Создаем локально
server_path = SERVERS_DIR / server_name
if server_path.exists():
raise HTTPException(400, "Сервер с таким именем уже существует")
server_path.mkdir(parents=True)
config = {
"name": server_name,
"displayName": data.get("displayName", server_name),
"startCommand": data.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui"),
"owner": user["username"],
"daemonId": "local"
}
save_server_config(server_name, config)
# Если пользователь не админ/owner, автоматически выдаем ему доступ
if user["role"] not in ["admin", "owner"]:
users = load_users()
if user["username"] in users:
if "servers" not in users[user["username"]]:
users[user["username"]]["servers"] = []
server_key = f"{daemon_id}_{server_name}" if daemon_id != "local" else server_name
if server_key not in users[user["username"]]["servers"]:
users[user["username"]]["servers"].append(server_key)
save_users(users)
return {"message": "Сервер создан", "name": server_name, "daemonId": daemon_id}
@app.get("/api/servers/{server_name}/config")
async def get_server_config(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
config = load_server_config(server_name)
print(f"Загружена конфигурация для {server_name}: {config}")
return config
@app.put("/api/servers/{server_name}/config")
async def update_server_config(server_name: str, config: dict, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
if server_name in server_processes:
raise HTTPException(400, "Остановите сервер перед изменением настроек")
save_server_config(server_name, config)
return {"message": "Настройки сохранены"}
@app.delete("/api/servers/{server_name}")
async def delete_server(server_name: str, user: dict = Depends(get_current_user)):
if user["role"] != "admin":
raise HTTPException(403, "Только администраторы могут удалять серверы")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
if server_name in server_processes:
raise HTTPException(400, "Остановите сервер перед удалением")
shutil.rmtree(server_path)
return {"message": "Сервер удален"}
# Управление процессами серверов
async def read_server_output(server_name: str, process: subprocess.Popen):
try:
print(f"Начало чтения вывода для сервера {server_name}")
loop = asyncio.get_event_loop()
while True:
if process.poll() is not None:
print(f"Процесс сервера {server_name} завершился с кодом {process.poll()}")
break
try:
line = await loop.run_in_executor(None, process.stdout.readline)
if not line:
break
line = line.strip()
if line:
if server_name not in server_logs:
server_logs[server_name] = []
server_logs[server_name].append(line)
if len(server_logs[server_name]) > 1000:
server_logs[server_name].pop(0)
except Exception as e:
print(f"Ошибка чтения строки для {server_name}: {e}")
await asyncio.sleep(0.1)
except Exception as e:
print(f"Ошибка чтения вывода сервера {server_name}: {e}")
finally:
print(f"Чтение вывода для сервера {server_name} завершено")
if server_name in server_processes and process.poll() is not None:
del server_processes[server_name]
print(f"Сервер {server_name} удален из списка процессов")
@app.post("/api/servers/{server_name}/start")
async def start_server(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
if server_name in server_processes:
raise HTTPException(400, "Сервер уже запущен")
config = load_server_config(server_name)
start_command = config.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui")
cmd_parts = start_command.split()
try:
if IS_WINDOWS:
process = subprocess.Popen(
cmd_parts,
cwd=server_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
process = subprocess.Popen(
cmd_parts,
cwd=server_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
server_processes[server_name] = process
server_logs[server_name] = []
asyncio.create_task(read_server_output(server_name, process))
print(f"Сервер {server_name} запущен с PID {process.pid}")
return {"message": "Сервер запущен", "pid": process.pid}
except Exception as e:
print(f"Ошибка запуска сервера {server_name}: {e}")
raise HTTPException(500, f"Ошибка запуска сервера: {str(e)}")
@app.post("/api/servers/{server_name}/stop")
async def stop_server(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
if server_name not in server_processes:
raise HTTPException(400, "Сервер не запущен")
process = server_processes[server_name]
try:
if process.stdin and not process.stdin.closed:
process.stdin.write("stop\n")
process.stdin.flush()
try:
process.wait(timeout=30)
except subprocess.TimeoutExpired:
print(f"Сервер {server_name} не остановился за 30 секунд, принудительное завершение")
process.kill()
process.wait()
except Exception as e:
print(f"Ошибка при остановке сервера {server_name}: {e}")
try:
process.kill()
process.wait()
except:
pass
finally:
if server_name in server_processes:
del server_processes[server_name]
print(f"Сервер {server_name} остановлен")
return {"message": "Сервер остановлен"}
@app.post("/api/servers/{server_name}/command")
async def send_command(server_name: str, command: dict, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
if server_name not in server_processes:
raise HTTPException(400, "Сервер не запущен")
process = server_processes[server_name]
if process.poll() is not None:
del server_processes[server_name]
raise HTTPException(400, "Сервер не запущен")
try:
cmd = command["command"]
if process.stdin and not process.stdin.closed:
process.stdin.write(cmd + "\n")
process.stdin.flush()
print(f"Команда отправлена серверу {server_name}: {cmd}")
return {"message": "Команда отправлена"}
else:
raise HTTPException(400, "Невозможно отправить команду")
except Exception as e:
print(f"Ошибка отправки команды серверу {server_name}: {e}")
raise HTTPException(500, f"Ошибка отправки команды: {str(e)}")
@app.get("/api/servers/{server_name}/stats")
async def get_server_stats(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
try:
disk_usage = sum(f.stat().st_size for f in server_path.rglob('*') if f.is_file())
disk_mb = disk_usage / 1024 / 1024
except:
disk_mb = 0
if server_name not in server_processes:
return {
"status": "stopped",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
process = server_processes[server_name]
try:
if process.poll() is not None:
del server_processes[server_name]
return {
"status": "stopped",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
proc = psutil.Process(process.pid)
memory_mb = proc.memory_info().rss / 1024 / 1024
cpu_percent = proc.cpu_percent(interval=0.1)
return {
"status": "running",
"cpu": round(cpu_percent, 2),
"memory": round(memory_mb, 2),
"disk": round(disk_mb, 2)
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
if server_name in server_processes:
del server_processes[server_name]
return {
"status": "stopped",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
except Exception as e:
print(f"Ошибка получения статистики для {server_name}: {e}")
return {
"status": "unknown",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
@app.websocket("/ws/servers/{server_name}/console")
async def console_websocket(websocket: WebSocket, server_name: str):
await websocket.accept()
print(f"WebSocket подключен для сервера: {server_name}")
if server_name in server_logs:
print(f"Отправка {len(server_logs[server_name])} существующих логов")
for log in server_logs[server_name]:
await websocket.send_text(log)
else:
print(f"Логов для сервера {server_name} пока нет")
await websocket.send_text(f"[Панель] Ожидание логов от сервера {server_name}...")
last_sent_index = len(server_logs.get(server_name, []))
try:
while True:
if server_name in server_logs:
current_logs = server_logs[server_name]
if len(current_logs) > last_sent_index:
for log in current_logs[last_sent_index:]:
await websocket.send_text(log)
last_sent_index = len(current_logs)
await asyncio.sleep(0.1)
except Exception as e:
print(f"WebSocket ошибка: {e}")
pass
# API для файлов
@app.get("/api/servers/{server_name}/files")
async def list_files(server_name: str, path: str = "", user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
target_path = server_path / path if path else server_path
try:
target_path = target_path.resolve()
server_path = server_path.resolve()
if not str(target_path).startswith(str(server_path)):
raise HTTPException(403, "Доступ запрещен")
except:
raise HTTPException(404, "Путь не найден")
if not target_path.exists():
raise HTTPException(404, "Путь не найден")
if not target_path.is_dir():
raise HTTPException(400, "Путь не является директорией")
files = []
try:
for item in target_path.iterdir():
files.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else 0
})
except Exception as e:
print(f"Ошибка чтения директории: {e}")
raise HTTPException(500, f"Ошибка чтения директории: {str(e)}")
return files
@app.get("/api/servers/{server_name}/files/download")
async def download_file(server_name: str, path: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
file_path = server_path / path
if not file_path.exists() or not str(file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
return FileResponse(file_path, filename=file_path.name)
@app.post("/api/servers/{server_name}/files/upload")
async def upload_file(server_name: str, path: str, file: UploadFile = File(...), user: dict = Depends(get_current_user)):
print(f"Upload request: server={server_name}, path='{path}', filename='{file.filename}'")
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
target_path = server_path / path / file.filename
print(f"Target path: {target_path}")
print(f"Server path: {server_path}")
print(f"Path starts with server_path: {str(target_path).startswith(str(server_path))}")
if not str(target_path).startswith(str(server_path)):
raise HTTPException(400, "Недопустимый путь")
try:
target_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Created directory: {target_path.parent}")
except Exception as e:
print(f"Error creating directory: {e}")
raise HTTPException(500, f"Ошибка создания директории: {str(e)}")
try:
with open(target_path, "wb") as f:
content = await file.read()
f.write(content)
print(f"File written successfully: {target_path}")
except Exception as e:
print(f"Error writing file: {e}")
raise HTTPException(500, f"Ошибка записи файла: {str(e)}")
return {"message": "Файл загружен"}
@app.post("/api/servers/{server_name}/files/create")
async def create_file_or_folder(server_name: str, data: dict, user: dict = Depends(get_current_user)):
"""Создать новый файл или папку"""
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
item_type = data.get("type") # "file" or "folder"
name = data.get("name", "").strip()
path = data.get("path", "") # Текущая папка
if not name:
raise HTTPException(400, "Имя не может быть пустым")
if item_type not in ["file", "folder"]:
raise HTTPException(400, "Тип должен быть 'file' или 'folder'")
server_path = SERVERS_DIR / server_name
# Формируем полный путь
if path:
full_path = server_path / path / name
else:
full_path = server_path / name
print(f"Creating {item_type}: {full_path}")
# Проверка безопасности
if not str(full_path).startswith(str(server_path)):
raise HTTPException(400, "Недопустимый путь")
try:
if item_type == "folder":
# Создаем папку
full_path.mkdir(parents=True, exist_ok=True)
# Создаем .gitkeep чтобы папка не была пустой
gitkeep = full_path / ".gitkeep"
gitkeep.touch()
print(f"Folder created: {full_path}")
else:
# Создаем файл
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.touch()
print(f"File created: {full_path}")
return {"message": f"{'Папка' if item_type == 'folder' else 'Файл'} создан(а)", "path": str(full_path)}
except Exception as e:
print(f"Error creating {item_type}: {e}")
raise HTTPException(500, f"Ошибка создания: {str(e)}")
@app.delete("/api/servers/{server_name}/files")
async def delete_file(server_name: str, path: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
target_path = server_path / path
if not target_path.exists() or not str(target_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
if target_path.is_dir():
shutil.rmtree(target_path)
else:
target_path.unlink()
return {"message": "Файл удален"}
@app.get("/api/servers/{server_name}/files/content")
async def get_file_content(server_name: str, path: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
file_path = server_path / path
if not file_path.exists() or not file_path.is_file() or not str(file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {"content": content}
except UnicodeDecodeError:
raise HTTPException(400, "Файл не является текстовым")
@app.put("/api/servers/{server_name}/files/content")
async def update_file_content(server_name: str, path: str, data: dict, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
file_path = server_path / path
if not file_path.exists() or not file_path.is_file() or not str(file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(data.get("content", ""))
return {"message": "Файл сохранен"}
except Exception as e:
raise HTTPException(400, f"Ошибка сохранения файла: {str(e)}")
@app.put("/api/servers/{server_name}/files/rename")
async def rename_file(server_name: str, old_path: str, new_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
old_file_path = server_path / old_path
if not old_file_path.exists() or not str(old_file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
new_file_path = old_file_path.parent / new_name
if new_file_path.exists():
raise HTTPException(400, "Файл с таким именем уже существует")
if not str(new_file_path).startswith(str(server_path)):
raise HTTPException(400, "Недопустимое имя файла")
old_file_path.rename(new_file_path)
return {"message": "Файл переименован"}
@app.post("/api/servers/{server_name}/files/move")
async def move_file(server_name: str, data: dict, user: dict = Depends(get_current_user)):
"""Переместить файл или папку"""
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
source_path = data.get("source", "").strip()
destination_path = data.get("destination", "").strip()
if not source_path:
raise HTTPException(400, "Не указан исходный путь")
server_path = SERVERS_DIR / server_name
source_full = server_path / source_path
# Формируем путь назначения
if destination_path:
# Извлекаем имя файла из source_path
file_name = source_full.name
dest_full = server_path / destination_path / file_name
else:
# Перемещение в корень
file_name = source_full.name
dest_full = server_path / file_name
print(f"Moving: {source_full} -> {dest_full}")
# Проверки безопасности
if not source_full.exists():
raise HTTPException(404, "Исходный файл не найден")
if not str(source_full).startswith(str(server_path)):
raise HTTPException(400, "Недопустимый исходный путь")
if not str(dest_full).startswith(str(server_path)):
raise HTTPException(400, "Недопустимый путь назначения")
if dest_full.exists():
raise HTTPException(400, "Файл с таким именем уже существует в папке назначения")
try:
# Создаем папку назначения если не существует
dest_full.parent.mkdir(parents=True, exist_ok=True)
# Перемещаем файл/папку
import shutil
shutil.move(str(source_full), str(dest_full))
print(f"Moved successfully: {dest_full}")
return {"message": "Файл перемещен", "new_path": str(dest_full)}
except Exception as e:
print(f"Error moving file: {e}")
raise HTTPException(500, f"Ошибка перемещения: {str(e)}")
@app.put("/api/servers/{server_name}/files/rename")
async def rename_file(server_name: str, old_path: str, new_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
old_file_path = server_path / old_path
if not old_file_path.exists() or not str(old_file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
new_file_path = old_file_path.parent / new_name
if new_file_path.exists():
raise HTTPException(400, "Файл с таким именем уже существует")
if not str(new_file_path).startswith(str(server_path)):
raise HTTPException(400, "Недопустимое имя файла")
old_file_path.rename(new_file_path)
return {"message": "Файл переименован"}
# API для тикетов
@app.get("/api/tickets")
async def get_tickets(user: dict = Depends(get_current_user)):
"""Получить список тикетов"""
# Проверяем права на тикеты
if not user.get("permissions", {}).get("tickets", True):
raise HTTPException(403, "Нет доступа к тикетам")
tickets = load_tickets()
# Владелец, админы и тех. поддержка видят все тикеты
if user["role"] in ["owner", "admin", "support"]:
return list(tickets.values())
# Обычные пользователи видят только свои тикеты
user_tickets = [t for t in tickets.values() if t["author"] == user["username"]]
return user_tickets
@app.post("/api/tickets/create")
async def create_ticket(data: dict, user: dict = Depends(get_current_user)):
"""Создать новый тикет"""
# Проверяем права на тикеты
if not user.get("permissions", {}).get("tickets", True):
raise HTTPException(403, "Нет доступа к тикетам")
tickets = load_tickets()
# Генерируем ID тикета
ticket_id = str(len(tickets) + 1)
ticket = {
"id": ticket_id,
"title": data.get("title", "").strip(),
"description": data.get("description", "").strip(),
"author": user["username"],
"status": "pending", # pending, in_progress, closed
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"messages": [
{
"author": user["username"],
"text": data.get("description", "").strip(),
"timestamp": datetime.utcnow().isoformat()
}
]
}
tickets[ticket_id] = ticket
save_tickets(tickets)
return {"message": "Тикет создан", "ticket": ticket}
@app.get("/api/tickets/{ticket_id}")
async def get_ticket(ticket_id: str, user: dict = Depends(get_current_user)):
"""Получить тикет по ID"""
# Проверяем права на тикеты
if not user.get("permissions", {}).get("tickets", True):
raise HTTPException(403, "Нет доступа к тикетам")
tickets = load_tickets()
if ticket_id not in tickets:
raise HTTPException(404, "Тикет не найден")
ticket = tickets[ticket_id]
# Проверка доступа
if user["role"] not in ["owner", "admin", "support"] and ticket["author"] != user["username"]:
raise HTTPException(403, "Нет доступа к этому тикету")
return ticket
@app.post("/api/tickets/{ticket_id}/message")
async def add_ticket_message(ticket_id: str, data: dict, user: dict = Depends(get_current_user)):
"""Добавить сообщение в тикет"""
# Проверяем права на тикеты
if not user.get("permissions", {}).get("tickets", True):
raise HTTPException(403, "Нет доступа к тикетам")
tickets = load_tickets()
if ticket_id not in tickets:
raise HTTPException(404, "Тикет не найден")
ticket = tickets[ticket_id]
# Проверка доступа
if user["role"] not in ["owner", "admin", "support"] and ticket["author"] != user["username"]:
raise HTTPException(403, "Нет доступа к этому тикету")
message = {
"author": user["username"],
"text": data.get("text", "").strip(),
"timestamp": datetime.utcnow().isoformat()
}
ticket["messages"].append(message)
ticket["updated_at"] = datetime.utcnow().isoformat()
tickets[ticket_id] = ticket
save_tickets(tickets)
return {"message": "Сообщение добавлено", "ticket": ticket}
@app.put("/api/tickets/{ticket_id}/status")
async def update_ticket_status(ticket_id: str, data: dict, user: dict = Depends(get_current_user)):
"""Изменить статус тикета (только для владельца, админов и тех. поддержки)"""
if user["role"] not in ["owner", "admin", "support"]:
raise HTTPException(403, "Недостаточно прав")
# Проверяем права на тикеты
if not user.get("permissions", {}).get("tickets", True):
raise HTTPException(403, "Нет доступа к тикетам")
tickets = load_tickets()
if ticket_id not in tickets:
raise HTTPException(404, "Тикет не найден")
new_status = data.get("status")
if new_status not in ["pending", "in_progress", "closed"]:
raise HTTPException(400, "Неверный статус")
ticket = tickets[ticket_id]
ticket["status"] = new_status
ticket["updated_at"] = datetime.utcnow().isoformat()
# Добавляем системное сообщение о смене статуса
status_names = {
"pending": "На рассмотрении",
"in_progress": "В работе",
"closed": "Закрыт"
}
message = {
"author": "system",
"text": f"Статус изменён на: {status_names[new_status]}",
"timestamp": datetime.utcnow().isoformat()
}
ticket["messages"].append(message)
tickets[ticket_id] = ticket
save_tickets(tickets)
return {"message": "Статус обновлён", "ticket": ticket}
# ============================================
# УПРАВЛЕНИЕ ПОЛЬЗОВАТЕЛЯМИ (v1.1.0)
# ============================================
# Загрузка пользователей
def load_users_dict():
users_file = Path("users.json")
if not users_file.exists():
return {}
with open(users_file, "r", encoding="utf-8") as f:
return json.load(f)
def save_users_dict(users):
with open("users.json", "w", encoding="utf-8") as f:
json.dump(users, f, indent=2, ensure_ascii=False)
# Проверка прав
def require_owner(current_user: dict):
if current_user.get("role") != "owner":
raise HTTPException(status_code=403, detail="Требуется роль владельца")
def require_admin_or_owner(current_user: dict):
if current_user.get("role") not in ["owner", "admin"]:
raise HTTPException(status_code=403, detail="Требуется роль администратора или владельца")
# 1. Получить список пользователей
@app.get("/api/users")
async def get_users(current_user: dict = Depends(get_current_user)):
require_admin_or_owner(current_user)
users = load_users_dict()
users_list = []
for username, user_data in users.items():
user_copy = user_data.copy()
user_copy.pop("password", None)
users_list.append(user_copy)
return users_list
# 2. Изменить роль пользователя
class RoleChange(BaseModel):
role: str
@app.put("/api/users/{username}/role")
async def change_user_role(username: str, role_data: RoleChange, current_user: dict = Depends(get_current_user)):
require_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if username == current_user.get("username"):
raise HTTPException(status_code=400, detail="Нельзя изменить свою роль")
valid_roles = ["owner", "admin", "support", "user", "banned"]
if role_data.role not in valid_roles:
raise HTTPException(status_code=400, detail=f"Неверная роль")
# Разрешаем несколько владельцев (убрано ограничение на одного)
# Теперь можно назначить несколько пользователей с ролью owner
old_role = users[username].get("role", "user")
users[username]["role"] = role_data.role
# Обновляем права
if role_data.role == "owner":
users[username]["permissions"] = {
"manage_users": True, "manage_roles": True, "manage_servers": True,
"manage_tickets": True, "manage_files": True, "delete_users": True,
"view_all_resources": True
}
elif role_data.role == "admin":
users[username]["permissions"] = {
"manage_users": True, "manage_roles": False, "manage_servers": True,
"manage_tickets": True, "manage_files": True, "delete_users": False,
"view_all_resources": True
}
elif role_data.role == "support":
users[username]["permissions"] = {
"manage_users": False, "manage_roles": False, "manage_servers": False,
"manage_tickets": True, "manage_files": False, "delete_users": False,
"view_all_resources": False
}
elif role_data.role == "banned":
users[username]["permissions"] = {
"manage_users": False, "manage_roles": False, "manage_servers": False,
"manage_tickets": False, "manage_files": False, "delete_users": False,
"view_all_resources": False
}
else: # user
users[username]["permissions"] = {
"manage_users": False, "manage_roles": False, "manage_servers": True,
"manage_tickets": True, "manage_files": True, "delete_users": False,
"view_all_resources": False
}
save_users_dict(users)
return {"message": f"Роль изменена с {old_role} на {role_data.role}", "user": {"username": username, "role": role_data.role}}
# 3. Заблокировать пользователя
class BanRequest(BaseModel):
reason: str = "Заблокирован администратором"
@app.post("/api/users/{username}/ban")
async def ban_user(username: str, ban_data: BanRequest, current_user: dict = Depends(get_current_user)):
require_admin_or_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if username == current_user.get("username"):
raise HTTPException(status_code=400, detail="Нельзя заблокировать самого себя")
# Проверяем, что не блокируем последнего владельца
if users[username].get("role") == "owner":
owners_count = sum(1 for u in users.values() if u.get("role") == "owner")
if owners_count <= 1:
raise HTTPException(status_code=400, detail="Нельзя заблокировать последнего владельца. Должен остаться хотя бы один владелец.")
users[username]["role"] = "banned"
users[username]["permissions"] = {
"manage_users": False, "manage_roles": False, "manage_servers": False,
"manage_tickets": False, "manage_files": False, "delete_users": False,
"view_all_resources": False
}
users[username]["ban_reason"] = ban_data.reason
save_users_dict(users)
return {"message": f"Пользователь {username} заблокирован", "username": username, "reason": ban_data.reason}
# 4. Разблокировать пользователя
@app.post("/api/users/{username}/unban")
async def unban_user(username: str, current_user: dict = Depends(get_current_user)):
require_admin_or_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if users[username].get("role") != "banned":
raise HTTPException(status_code=400, detail="Пользователь не заблокирован")
users[username]["role"] = "user"
users[username]["permissions"] = {
"manage_users": False, "manage_roles": False, "manage_servers": True,
"manage_tickets": True, "manage_files": True, "delete_users": False,
"view_all_resources": False
}
users[username].pop("ban_reason", None)
save_users_dict(users)
return {"message": f"Пользователь {username} разблокирован", "username": username}
# 5. Удалить пользователя
@app.delete("/api/users/{username}")
async def delete_user(username: str, current_user: dict = Depends(get_current_user)):
require_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if username == current_user.get("username"):
raise HTTPException(status_code=400, detail="Нельзя удалить самого себя")
# Проверяем, что не удаляем последнего владельца
if users[username].get("role") == "owner":
owners_count = sum(1 for u in users.values() if u.get("role") == "owner")
if owners_count <= 1:
raise HTTPException(status_code=400, detail="Нельзя удалить последнего владельца. Должен остаться хотя бы один владелец.")
del users[username]
save_users_dict(users)
return {"message": f"Пользователь {username} удалён", "username": username}
# 6. Выдать доступ к серверу
class ServerAccess(BaseModel):
server_name: str
@app.post("/api/users/{username}/access/servers")
async def grant_server_access(username: str, access: ServerAccess, current_user: dict = Depends(get_current_user)):
require_admin_or_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if "resource_access" not in users[username]:
users[username]["resource_access"] = {"servers": [], "tickets": [], "files": []}
if access.server_name not in users[username]["resource_access"]["servers"]:
users[username]["resource_access"]["servers"].append(access.server_name)
# Также добавляем в старое поле servers для совместимости
if "servers" not in users[username]:
users[username]["servers"] = []
if access.server_name not in users[username]["servers"]:
users[username]["servers"].append(access.server_name)
save_users_dict(users)
return {"message": f"Доступ к серверу {access.server_name} выдан", "server": access.server_name, "user": username}
# 7. Забрать доступ к серверу
@app.delete("/api/users/{username}/access/servers/{server_name}")
async def revoke_server_access(username: str, server_name: str, current_user: dict = Depends(get_current_user)):
require_admin_or_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if "resource_access" in users[username] and "servers" in users[username]["resource_access"]:
if server_name in users[username]["resource_access"]["servers"]:
users[username]["resource_access"]["servers"].remove(server_name)
# Также удаляем из старого поля servers
if "servers" in users[username] and server_name in users[username]["servers"]:
users[username]["servers"].remove(server_name)
save_users_dict(users)
return {"message": f"Доступ к серверу {server_name} отозван", "server": server_name, "user": username}
# 8. Изменить права пользователя
class PermissionsUpdate(BaseModel):
permissions: dict
@app.put("/api/users/{username}/permissions")
async def update_user_permissions(username: str, perms: PermissionsUpdate, current_user: dict = Depends(get_current_user)):
require_owner(current_user)
users = load_users_dict()
if username not in users:
raise HTTPException(status_code=404, detail="Пользователь не найден")
users[username]["permissions"] = perms.permissions
save_users_dict(users)
return {"message": f"Права пользователя {username} обновлены", "permissions": perms.permissions}
# ============================================
# API для управления демонами
# ============================================
from daemons import router as daemons_router
# Подключаем роутер демонов
app.include_router(daemons_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)