from fastapi import FastAPI, WebSocket, UploadFile, File, HTTPException, Depends, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import asyncio import subprocess import psutil import os import shutil import sys from pathlib import Path from typing import Optional import json from passlib.context import CryptContext from jose import JWTError, jwt from datetime import datetime, timedelta app = FastAPI(title="MC Panel") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Настройки безопасности SECRET_KEY = "your-secret-key-change-this-in-production-12345" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 дней pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer(auto_error=False) SERVERS_DIR = Path("servers") SERVERS_DIR.mkdir(exist_ok=True) USERS_FILE = Path("users.json") TICKETS_FILE = Path("tickets.json") server_processes: dict[str, subprocess.Popen] = {} server_logs: dict[str, list[str]] = {} IS_WINDOWS = sys.platform == 'win32' # Инициализация файла пользователей def init_users(): if not USERS_FILE.exists(): admin_user = { "username": "Sofa12345", "password": pwd_context.hash("arkonsad123"), "role": "admin", "servers": [] } save_users({"Sofa12345": admin_user}) print("Создан пользователь по умолчанию: none / none") def load_users() -> dict: if USERS_FILE.exists(): with open(USERS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_users(users: dict): with open(USERS_FILE, 'w', encoding='utf-8') as f: json.dump(users, f, indent=2, ensure_ascii=False) def load_server_config(server_name: str) -> dict: config_path = SERVERS_DIR / server_name / "panel_config.json" if config_path.exists(): with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) return { "name": server_name, "displayName": server_name, "startCommand": "java -Xmx2G -Xms1G -jar server.jar nogui" } def save_server_config(server_name: str, config: dict): config_path = SERVERS_DIR / server_name / "panel_config.json" with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) # Функции для работы с тикетами def load_tickets() -> dict: if TICKETS_FILE.exists(): with open(TICKETS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_tickets(tickets: dict): with open(TICKETS_FILE, 'w', encoding='utf-8') as f: json.dump(tickets, f, indent=2, ensure_ascii=False) init_users() # Функции аутентификации def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict): to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): if not credentials: raise HTTPException(status_code=401, detail="Требуется авторизация") token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Неверный токен") users = load_users() if username not in users: raise HTTPException(status_code=401, detail="Пользователь не найден") user = users[username] # Проверка на бан if user.get("role") == "banned": raise HTTPException(status_code=403, detail="Ваш аккаунт заблокирован") return user except JWTError: raise HTTPException(status_code=401, detail="Неверный токен") def check_server_access(user: dict, server_name: str): if user["role"] == "admin": return True return server_name in user.get("servers", []) # API для аутентификации @app.post("/api/auth/register") async def register(data: dict): users = load_users() username = data.get("username", "").strip() password = data.get("password", "").strip() if not username or not password: raise HTTPException(400, "Имя пользователя и пароль обязательны") if username in users: raise HTTPException(400, "Пользователь уже существует") role = "admin" if len(users) == 0 else "user" users[username] = { "username": username, "password": get_password_hash(password), "role": role, "servers": [] } save_users(users) access_token = create_access_token(data={"sub": username}) return { "access_token": access_token, "token_type": "bearer", "username": username, "role": role } @app.post("/api/auth/login") async def login(data: dict): users = load_users() username = data.get("username", "").strip() password = data.get("password", "").strip() if username not in users: raise HTTPException(401, "Неверное имя пользователя или пароль") user = users[username] if not verify_password(password, user["password"]): raise HTTPException(401, "Неверное имя пользователя или пароль") access_token = create_access_token(data={"sub": username}) return { "access_token": access_token, "token_type": "bearer", "username": username, "role": user["role"] } @app.get("/api/auth/me") async def get_me(user: dict = Depends(get_current_user)): return { "username": user["username"], "role": user["role"], "servers": user.get("servers", []) } # API для управления пользователями @app.get("/api/users") async def get_users(user: dict = Depends(get_current_user)): # Админы видят всех пользователей # Обычные пользователи тоже видят всех (для управления доступом к своим серверам) users = load_users() return [ { "username": u["username"], "role": u["role"], "servers": u.get("servers", []) } for u in users.values() ] @app.put("/api/users/{username}/servers") async def update_user_servers(username: str, data: dict, user: dict = Depends(get_current_user)): users = load_users() if username not in users: raise HTTPException(404, "Пользователь не найден") # Админы могут управлять доступом к любым серверам if user["role"] == "admin": users[username]["servers"] = data.get("servers", []) save_users(users) return {"message": "Доступ обновлен"} # Обычные пользователи могут управлять доступом только к своим серверам requested_servers = data.get("servers", []) current_servers = users[username].get("servers", []) # Проверяем, что пользователь пытается изменить доступ только к своим серверам for server_name in requested_servers: if server_name not in current_servers: # Проверяем, является ли текущий пользователь владельцем этого сервера config = load_server_config(server_name) if config.get("owner") != user["username"]: raise HTTPException(403, f"Вы не можете выдать доступ к серверу {server_name}") users[username]["servers"] = requested_servers save_users(users) return {"message": "Доступ обновлен"} @app.delete("/api/users/{username}") async def delete_user(username: str, user: dict = Depends(get_current_user)): if user["role"] != "admin": raise HTTPException(403, "Доступ запрещен") if username == user["username"]: raise HTTPException(400, "Нельзя удалить самого себя") users = load_users() if username not in users: raise HTTPException(404, "Пользователь не найден") del users[username] save_users(users) return {"message": "Пользователь удален"} @app.put("/api/users/{username}/role") async def update_user_role(username: str, data: dict, user: dict = Depends(get_current_user)): if user["role"] != "admin": raise HTTPException(403, "Доступ запрещен") if username == user["username"]: raise HTTPException(400, "Нельзя изменить свою роль") users = load_users() if username not in users: raise HTTPException(404, "Пользователь не найден") new_role = data.get("role") if new_role not in ["admin", "user", "support", "banned"]: raise HTTPException(400, "Неверная роль") users[username]["role"] = new_role save_users(users) return {"message": "Роль обновлена"} # API для личного кабинета @app.put("/api/profile/username") async def update_username(data: dict, user: dict = Depends(get_current_user)): """Изменить имя пользователя""" new_username = data.get("new_username", "").strip() password = data.get("password", "") if not new_username: raise HTTPException(400, "Имя пользователя не может быть пустым") if len(new_username) < 3: raise HTTPException(400, "Имя пользователя должно быть не менее 3 символов") users = load_users() # Проверяем пароль if not verify_password(password, users[user["username"]]["password"]): raise HTTPException(400, "Неверный пароль") # Проверяем, не занято ли новое имя if new_username in users and new_username != user["username"]: raise HTTPException(400, "Это имя пользователя уже занято") # Сохраняем данные пользователя old_username = user["username"] user_data = users[old_username] # Удаляем старую запись и создаём новую del users[old_username] user_data["username"] = new_username users[new_username] = user_data # Обновляем владельцев серверов for server_dir in SERVERS_DIR.iterdir(): if server_dir.is_dir(): config = load_server_config(server_dir.name) if config.get("owner") == old_username: config["owner"] = new_username save_server_config(server_dir.name, config) # Обновляем доступы к серверам у других пользователей for username, user_info in users.items(): if "servers" in user_info and old_username in user_info.get("servers", []): user_info["servers"] = [new_username if s == old_username else s for s in user_info["servers"]] save_users(users) # Создаём новый токен new_token = create_access_token({"sub": new_username, "role": user_data["role"]}) return { "message": "Имя пользователя изменено", "access_token": new_token, "username": new_username } @app.put("/api/profile/password") async def update_password(data: dict, user: dict = Depends(get_current_user)): """Изменить пароль""" old_password = data.get("old_password", "") new_password = data.get("new_password", "") if not old_password or not new_password: raise HTTPException(400, "Заполните все поля") if len(new_password) < 6: raise HTTPException(400, "Новый пароль должен быть не менее 6 символов") users = load_users() # Проверяем старый пароль if not verify_password(old_password, users[user["username"]]["password"]): raise HTTPException(400, "Неверный старый пароль") # Устанавливаем новый пароль users[user["username"]]["password"] = get_password_hash(new_password) save_users(users) return {"message": "Пароль изменён"} @app.get("/api/profile/stats") async def get_profile_stats(user: dict = Depends(get_current_user)): """Получить статистику профиля""" users = load_users() user_data = users.get(user["username"], {}) # Подсчитываем серверы пользователя owned_servers = [] accessible_servers = [] for server_dir in SERVERS_DIR.iterdir(): if server_dir.is_dir(): config = load_server_config(server_dir.name) if config.get("owner") == user["username"]: owned_servers.append({ "name": server_dir.name, "displayName": config.get("displayName", server_dir.name) }) elif user["username"] in user_data.get("servers", []) or user["role"] == "admin": accessible_servers.append({ "name": server_dir.name, "displayName": config.get("displayName", server_dir.name) }) # Подсчитываем тикеты tickets = load_tickets() user_tickets = [t for t in tickets.values() if t["author"] == user["username"]] tickets_stats = { "total": len(user_tickets), "pending": len([t for t in user_tickets if t["status"] == "pending"]), "in_progress": len([t for t in user_tickets if t["status"] == "in_progress"]), "closed": len([t for t in user_tickets if t["status"] == "closed"]) } return { "username": user["username"], "role": user["role"], "owned_servers": owned_servers, "accessible_servers": accessible_servers, "tickets": tickets_stats, "total_servers": len(owned_servers) + len(accessible_servers) } @app.get("/api/profile/stats/{username}") async def get_user_profile_stats(username: str, user: dict = Depends(get_current_user)): """Получить статистику профиля другого пользователя (только для админов и тех. поддержки)""" # Проверка прав доступа if user["role"] not in ["admin", "support"]: raise HTTPException(403, "Недостаточно прав для просмотра профилей других пользователей") users = load_users() # Проверка существования пользователя if username not in users: raise HTTPException(404, "Пользователь не найден") target_user = users[username] # Подсчитываем серверы пользователя owned_servers = [] accessible_servers = [] for server_dir in SERVERS_DIR.iterdir(): if server_dir.is_dir(): config = load_server_config(server_dir.name) if config.get("owner") == username: owned_servers.append({ "name": server_dir.name, "displayName": config.get("displayName", server_dir.name) }) elif username in target_user.get("servers", []) or target_user["role"] == "admin": accessible_servers.append({ "name": server_dir.name, "displayName": config.get("displayName", server_dir.name) }) # Подсчитываем тикеты tickets = load_tickets() user_tickets = [t for t in tickets.values() if t["author"] == username] tickets_stats = { "total": len(user_tickets), "pending": len([t for t in user_tickets if t["status"] == "pending"]), "in_progress": len([t for t in user_tickets if t["status"] == "in_progress"]), "closed": len([t for t in user_tickets if t["status"] == "closed"]) } return { "username": username, "role": target_user["role"], "owned_servers": owned_servers, "accessible_servers": accessible_servers, "tickets": tickets_stats, "total_servers": len(owned_servers) + len(accessible_servers), "is_viewing_other": True # Флаг что это чужой профиль } # API для серверов @app.get("/api/servers") async def get_servers(user: dict = Depends(get_current_user)): servers = [] try: for server_dir in SERVERS_DIR.iterdir(): if server_dir.is_dir(): if user["role"] != "admin" and server_dir.name not in user.get("servers", []): continue config = load_server_config(server_dir.name) is_running = False if server_dir.name in server_processes: process = server_processes[server_dir.name] if process.poll() is None: is_running = True else: del server_processes[server_dir.name] servers.append({ "name": server_dir.name, "displayName": config.get("displayName", server_dir.name), "status": "running" if is_running else "stopped" }) print(f"Найдено серверов для {user['username']}: {len(servers)}") except Exception as e: print(f"Ошибка загрузки серверов: {e}") return servers @app.post("/api/servers/create") async def create_server(data: dict, user: dict = Depends(get_current_user)): server_name = data.get("name", "").strip() if not server_name or not server_name.replace("_", "").replace("-", "").isalnum(): raise HTTPException(400, "Недопустимое имя сервера") server_path = SERVERS_DIR / server_name if server_path.exists(): raise HTTPException(400, "Сервер с таким именем уже существует") server_path.mkdir(parents=True) config = { "name": server_name, "displayName": data.get("displayName", server_name), "startCommand": data.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui"), "owner": user["username"] # Сохраняем владельца } save_server_config(server_name, config) # Если пользователь не админ, автоматически выдаем ему доступ if user["role"] != "admin": users = load_users() if user["username"] in users: if "servers" not in users[user["username"]]: users[user["username"]]["servers"] = [] if server_name not in users[user["username"]]["servers"]: users[user["username"]]["servers"].append(server_name) save_users(users) return {"message": "Сервер создан", "name": server_name} @app.get("/api/servers/{server_name}/config") async def get_server_config(server_name: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name if not server_path.exists(): raise HTTPException(404, "Сервер не найден") config = load_server_config(server_name) print(f"Загружена конфигурация для {server_name}: {config}") return config @app.put("/api/servers/{server_name}/config") async def update_server_config(server_name: str, config: dict, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name if not server_path.exists(): raise HTTPException(404, "Сервер не найден") if server_name in server_processes: raise HTTPException(400, "Остановите сервер перед изменением настроек") save_server_config(server_name, config) return {"message": "Настройки сохранены"} @app.delete("/api/servers/{server_name}") async def delete_server(server_name: str, user: dict = Depends(get_current_user)): if user["role"] != "admin": raise HTTPException(403, "Только администраторы могут удалять серверы") server_path = SERVERS_DIR / server_name if not server_path.exists(): raise HTTPException(404, "Сервер не найден") if server_name in server_processes: raise HTTPException(400, "Остановите сервер перед удалением") shutil.rmtree(server_path) return {"message": "Сервер удален"} # Управление процессами серверов async def read_server_output(server_name: str, process: subprocess.Popen): try: print(f"Начало чтения вывода для сервера {server_name}") loop = asyncio.get_event_loop() while True: if process.poll() is not None: print(f"Процесс сервера {server_name} завершился с кодом {process.poll()}") break try: line = await loop.run_in_executor(None, process.stdout.readline) if not line: break line = line.strip() if line: if server_name not in server_logs: server_logs[server_name] = [] server_logs[server_name].append(line) if len(server_logs[server_name]) > 1000: server_logs[server_name].pop(0) except Exception as e: print(f"Ошибка чтения строки для {server_name}: {e}") await asyncio.sleep(0.1) except Exception as e: print(f"Ошибка чтения вывода сервера {server_name}: {e}") finally: print(f"Чтение вывода для сервера {server_name} завершено") if server_name in server_processes and process.poll() is not None: del server_processes[server_name] print(f"Сервер {server_name} удален из списка процессов") @app.post("/api/servers/{server_name}/start") async def start_server(server_name: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name if not server_path.exists(): raise HTTPException(404, "Сервер не найден") if server_name in server_processes: raise HTTPException(400, "Сервер уже запущен") config = load_server_config(server_name) start_command = config.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui") cmd_parts = start_command.split() try: if IS_WINDOWS: process = subprocess.Popen( cmd_parts, cwd=server_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, creationflags=subprocess.CREATE_NO_WINDOW ) else: process = subprocess.Popen( cmd_parts, cwd=server_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) server_processes[server_name] = process server_logs[server_name] = [] asyncio.create_task(read_server_output(server_name, process)) print(f"Сервер {server_name} запущен с PID {process.pid}") return {"message": "Сервер запущен", "pid": process.pid} except Exception as e: print(f"Ошибка запуска сервера {server_name}: {e}") raise HTTPException(500, f"Ошибка запуска сервера: {str(e)}") @app.post("/api/servers/{server_name}/stop") async def stop_server(server_name: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") if server_name not in server_processes: raise HTTPException(400, "Сервер не запущен") process = server_processes[server_name] try: if process.stdin and not process.stdin.closed: process.stdin.write("stop\n") process.stdin.flush() try: process.wait(timeout=30) except subprocess.TimeoutExpired: print(f"Сервер {server_name} не остановился за 30 секунд, принудительное завершение") process.kill() process.wait() except Exception as e: print(f"Ошибка при остановке сервера {server_name}: {e}") try: process.kill() process.wait() except: pass finally: if server_name in server_processes: del server_processes[server_name] print(f"Сервер {server_name} остановлен") return {"message": "Сервер остановлен"} @app.post("/api/servers/{server_name}/command") async def send_command(server_name: str, command: dict, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") if server_name not in server_processes: raise HTTPException(400, "Сервер не запущен") process = server_processes[server_name] if process.poll() is not None: del server_processes[server_name] raise HTTPException(400, "Сервер не запущен") try: cmd = command["command"] if process.stdin and not process.stdin.closed: process.stdin.write(cmd + "\n") process.stdin.flush() print(f"Команда отправлена серверу {server_name}: {cmd}") return {"message": "Команда отправлена"} else: raise HTTPException(400, "Невозможно отправить команду") except Exception as e: print(f"Ошибка отправки команды серверу {server_name}: {e}") raise HTTPException(500, f"Ошибка отправки команды: {str(e)}") @app.get("/api/servers/{server_name}/stats") async def get_server_stats(server_name: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name try: disk_usage = sum(f.stat().st_size for f in server_path.rglob('*') if f.is_file()) disk_mb = disk_usage / 1024 / 1024 except: disk_mb = 0 if server_name not in server_processes: return { "status": "stopped", "cpu": 0, "memory": 0, "disk": round(disk_mb, 2) } process = server_processes[server_name] try: if process.poll() is not None: del server_processes[server_name] return { "status": "stopped", "cpu": 0, "memory": 0, "disk": round(disk_mb, 2) } proc = psutil.Process(process.pid) memory_mb = proc.memory_info().rss / 1024 / 1024 cpu_percent = proc.cpu_percent(interval=0.1) return { "status": "running", "cpu": round(cpu_percent, 2), "memory": round(memory_mb, 2), "disk": round(disk_mb, 2) } except (psutil.NoSuchProcess, psutil.AccessDenied): if server_name in server_processes: del server_processes[server_name] return { "status": "stopped", "cpu": 0, "memory": 0, "disk": round(disk_mb, 2) } except Exception as e: print(f"Ошибка получения статистики для {server_name}: {e}") return { "status": "unknown", "cpu": 0, "memory": 0, "disk": round(disk_mb, 2) } @app.websocket("/ws/servers/{server_name}/console") async def console_websocket(websocket: WebSocket, server_name: str): await websocket.accept() print(f"WebSocket подключен для сервера: {server_name}") if server_name in server_logs: print(f"Отправка {len(server_logs[server_name])} существующих логов") for log in server_logs[server_name]: await websocket.send_text(log) else: print(f"Логов для сервера {server_name} пока нет") await websocket.send_text(f"[Панель] Ожидание логов от сервера {server_name}...") last_sent_index = len(server_logs.get(server_name, [])) try: while True: if server_name in server_logs: current_logs = server_logs[server_name] if len(current_logs) > last_sent_index: for log in current_logs[last_sent_index:]: await websocket.send_text(log) last_sent_index = len(current_logs) await asyncio.sleep(0.1) except Exception as e: print(f"WebSocket ошибка: {e}") pass # API для файлов @app.get("/api/servers/{server_name}/files") async def list_files(server_name: str, path: str = "", user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name if not server_path.exists(): raise HTTPException(404, "Сервер не найден") target_path = server_path / path if path else server_path try: target_path = target_path.resolve() server_path = server_path.resolve() if not str(target_path).startswith(str(server_path)): raise HTTPException(403, "Доступ запрещен") except: raise HTTPException(404, "Путь не найден") if not target_path.exists(): raise HTTPException(404, "Путь не найден") if not target_path.is_dir(): raise HTTPException(400, "Путь не является директорией") files = [] try: for item in target_path.iterdir(): files.append({ "name": item.name, "type": "directory" if item.is_dir() else "file", "size": item.stat().st_size if item.is_file() else 0 }) except Exception as e: print(f"Ошибка чтения директории: {e}") raise HTTPException(500, f"Ошибка чтения директории: {str(e)}") return files @app.get("/api/servers/{server_name}/files/download") async def download_file(server_name: str, path: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name file_path = server_path / path if not file_path.exists() or not str(file_path).startswith(str(server_path)): raise HTTPException(404, "Файл не найден") return FileResponse(file_path, filename=file_path.name) @app.post("/api/servers/{server_name}/files/upload") async def upload_file(server_name: str, path: str, file: UploadFile = File(...), user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name target_path = server_path / path / file.filename if not str(target_path).startswith(str(server_path)): raise HTTPException(400, "Недопустимый путь") target_path.parent.mkdir(parents=True, exist_ok=True) with open(target_path, "wb") as f: content = await file.read() f.write(content) return {"message": "Файл загружен"} @app.delete("/api/servers/{server_name}/files") async def delete_file(server_name: str, path: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name target_path = server_path / path if not target_path.exists() or not str(target_path).startswith(str(server_path)): raise HTTPException(404, "Файл не найден") if target_path.is_dir(): shutil.rmtree(target_path) else: target_path.unlink() return {"message": "Файл удален"} @app.get("/api/servers/{server_name}/files/content") async def get_file_content(server_name: str, path: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name file_path = server_path / path if not file_path.exists() or not file_path.is_file() or not str(file_path).startswith(str(server_path)): raise HTTPException(404, "Файл не найден") try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return {"content": content} except UnicodeDecodeError: raise HTTPException(400, "Файл не является текстовым") @app.put("/api/servers/{server_name}/files/content") async def update_file_content(server_name: str, path: str, data: dict, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name file_path = server_path / path if not file_path.exists() or not file_path.is_file() or not str(file_path).startswith(str(server_path)): raise HTTPException(404, "Файл не найден") try: with open(file_path, 'w', encoding='utf-8') as f: f.write(data.get("content", "")) return {"message": "Файл сохранен"} except Exception as e: raise HTTPException(400, f"Ошибка сохранения файла: {str(e)}") @app.put("/api/servers/{server_name}/files/rename") async def rename_file(server_name: str, old_path: str, new_name: str, user: dict = Depends(get_current_user)): if not check_server_access(user, server_name): raise HTTPException(403, "Нет доступа к этому серверу") server_path = SERVERS_DIR / server_name old_file_path = server_path / old_path if not old_file_path.exists() or not str(old_file_path).startswith(str(server_path)): raise HTTPException(404, "Файл не найден") new_file_path = old_file_path.parent / new_name if new_file_path.exists(): raise HTTPException(400, "Файл с таким именем уже существует") if not str(new_file_path).startswith(str(server_path)): raise HTTPException(400, "Недопустимое имя файла") old_file_path.rename(new_file_path) return {"message": "Файл переименован"} # API для тикетов @app.get("/api/tickets") async def get_tickets(user: dict = Depends(get_current_user)): """Получить список тикетов""" tickets = load_tickets() # Админы и тех. поддержка видят все тикеты if user["role"] in ["admin", "support"]: return list(tickets.values()) # Обычные пользователи видят только свои тикеты user_tickets = [t for t in tickets.values() if t["author"] == user["username"]] return user_tickets @app.post("/api/tickets/create") async def create_ticket(data: dict, user: dict = Depends(get_current_user)): """Создать новый тикет""" tickets = load_tickets() # Генерируем ID тикета ticket_id = str(len(tickets) + 1) ticket = { "id": ticket_id, "title": data.get("title", "").strip(), "description": data.get("description", "").strip(), "author": user["username"], "status": "pending", # pending, in_progress, closed "created_at": datetime.utcnow().isoformat(), "updated_at": datetime.utcnow().isoformat(), "messages": [ { "author": user["username"], "text": data.get("description", "").strip(), "timestamp": datetime.utcnow().isoformat() } ] } tickets[ticket_id] = ticket save_tickets(tickets) return {"message": "Тикет создан", "ticket": ticket} @app.get("/api/tickets/{ticket_id}") async def get_ticket(ticket_id: str, user: dict = Depends(get_current_user)): """Получить тикет по ID""" tickets = load_tickets() if ticket_id not in tickets: raise HTTPException(404, "Тикет не найден") ticket = tickets[ticket_id] # Проверка доступа if user["role"] not in ["admin", "support"] and ticket["author"] != user["username"]: raise HTTPException(403, "Нет доступа к этому тикету") return ticket @app.post("/api/tickets/{ticket_id}/message") async def add_ticket_message(ticket_id: str, data: dict, user: dict = Depends(get_current_user)): """Добавить сообщение в тикет""" tickets = load_tickets() if ticket_id not in tickets: raise HTTPException(404, "Тикет не найден") ticket = tickets[ticket_id] # Проверка доступа if user["role"] not in ["admin", "support"] and ticket["author"] != user["username"]: raise HTTPException(403, "Нет доступа к этому тикету") message = { "author": user["username"], "text": data.get("text", "").strip(), "timestamp": datetime.utcnow().isoformat() } ticket["messages"].append(message) ticket["updated_at"] = datetime.utcnow().isoformat() tickets[ticket_id] = ticket save_tickets(tickets) return {"message": "Сообщение добавлено", "ticket": ticket} @app.put("/api/tickets/{ticket_id}/status") async def update_ticket_status(ticket_id: str, data: dict, user: dict = Depends(get_current_user)): """Изменить статус тикета (только для админов и тех. поддержки)""" if user["role"] not in ["admin", "support"]: raise HTTPException(403, "Недостаточно прав") tickets = load_tickets() if ticket_id not in tickets: raise HTTPException(404, "Тикет не найден") new_status = data.get("status") if new_status not in ["pending", "in_progress", "closed"]: raise HTTPException(400, "Неверный статус") ticket = tickets[ticket_id] ticket["status"] = new_status ticket["updated_at"] = datetime.utcnow().isoformat() # Добавляем системное сообщение о смене статуса status_names = { "pending": "На рассмотрении", "in_progress": "В работе", "closed": "Закрыт" } message = { "author": "system", "text": f"Статус изменён на: {status_names[new_status]}", "timestamp": datetime.utcnow().isoformat() } ticket["messages"].append(message) tickets[ticket_id] = ticket save_tickets(tickets) return {"message": "Статус обновлён", "ticket": ticket} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)