Files
NeveTimePanel/backend/main.py
2026-01-14 22:13:07 +06:00

1027 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, WebSocket, UploadFile, File, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import asyncio
import subprocess
import psutil
import os
import shutil
import sys
from pathlib import Path
from typing import Optional
import json
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
app = FastAPI(title="MC Panel")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Настройки безопасности
SECRET_KEY = "your-secret-key-change-this-in-production-12345"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 дней
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False)
SERVERS_DIR = Path("servers")
SERVERS_DIR.mkdir(exist_ok=True)
USERS_FILE = Path("users.json")
TICKETS_FILE = Path("tickets.json")
server_processes: dict[str, subprocess.Popen] = {}
server_logs: dict[str, list[str]] = {}
IS_WINDOWS = sys.platform == 'win32'
# Инициализация файла пользователей
def init_users():
if not USERS_FILE.exists():
admin_user = {
"username": "Sofa12345",
"password": pwd_context.hash("arkonsad123"),
"role": "admin",
"servers": []
}
save_users({"Sofa12345": admin_user})
print("Создан пользователь по умолчанию: none / none")
def load_users() -> dict:
if USERS_FILE.exists():
with open(USERS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_users(users: dict):
with open(USERS_FILE, 'w', encoding='utf-8') as f:
json.dump(users, f, indent=2, ensure_ascii=False)
def load_server_config(server_name: str) -> dict:
config_path = SERVERS_DIR / server_name / "panel_config.json"
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {
"name": server_name,
"displayName": server_name,
"startCommand": "java -Xmx2G -Xms1G -jar server.jar nogui"
}
def save_server_config(server_name: str, config: dict):
config_path = SERVERS_DIR / server_name / "panel_config.json"
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# Функции для работы с тикетами
def load_tickets() -> dict:
if TICKETS_FILE.exists():
with open(TICKETS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_tickets(tickets: dict):
with open(TICKETS_FILE, 'w', encoding='utf-8') as f:
json.dump(tickets, f, indent=2, ensure_ascii=False)
init_users()
# Функции аутентификации
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not credentials:
raise HTTPException(status_code=401, detail="Требуется авторизация")
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Неверный токен")
users = load_users()
if username not in users:
raise HTTPException(status_code=401, detail="Пользователь не найден")
user = users[username]
# Проверка на бан
if user.get("role") == "banned":
raise HTTPException(status_code=403, detail="Ваш аккаунт заблокирован")
return user
except JWTError:
raise HTTPException(status_code=401, detail="Неверный токен")
def check_server_access(user: dict, server_name: str):
if user["role"] == "admin":
return True
return server_name in user.get("servers", [])
# API для аутентификации
@app.post("/api/auth/register")
async def register(data: dict):
users = load_users()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
if not username or not password:
raise HTTPException(400, "Имя пользователя и пароль обязательны")
if username in users:
raise HTTPException(400, "Пользователь уже существует")
role = "admin" if len(users) == 0 else "user"
users[username] = {
"username": username,
"password": get_password_hash(password),
"role": role,
"servers": []
}
save_users(users)
access_token = create_access_token(data={"sub": username})
return {
"access_token": access_token,
"token_type": "bearer",
"username": username,
"role": role
}
@app.post("/api/auth/login")
async def login(data: dict):
users = load_users()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
if username not in users:
raise HTTPException(401, "Неверное имя пользователя или пароль")
user = users[username]
if not verify_password(password, user["password"]):
raise HTTPException(401, "Неверное имя пользователя или пароль")
access_token = create_access_token(data={"sub": username})
return {
"access_token": access_token,
"token_type": "bearer",
"username": username,
"role": user["role"]
}
@app.get("/api/auth/me")
async def get_me(user: dict = Depends(get_current_user)):
return {
"username": user["username"],
"role": user["role"],
"servers": user.get("servers", [])
}
# API для управления пользователями
@app.get("/api/users")
async def get_users(user: dict = Depends(get_current_user)):
# Админы видят всех пользователей
# Обычные пользователи тоже видят всех (для управления доступом к своим серверам)
users = load_users()
return [
{
"username": u["username"],
"role": u["role"],
"servers": u.get("servers", [])
}
for u in users.values()
]
@app.put("/api/users/{username}/servers")
async def update_user_servers(username: str, data: dict, user: dict = Depends(get_current_user)):
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
# Админы могут управлять доступом к любым серверам
if user["role"] == "admin":
users[username]["servers"] = data.get("servers", [])
save_users(users)
return {"message": "Доступ обновлен"}
# Обычные пользователи могут управлять доступом только к своим серверам
requested_servers = data.get("servers", [])
current_servers = users[username].get("servers", [])
# Проверяем, что пользователь пытается изменить доступ только к своим серверам
for server_name in requested_servers:
if server_name not in current_servers:
# Проверяем, является ли текущий пользователь владельцем этого сервера
config = load_server_config(server_name)
if config.get("owner") != user["username"]:
raise HTTPException(403, f"Вы не можете выдать доступ к серверу {server_name}")
users[username]["servers"] = requested_servers
save_users(users)
return {"message": "Доступ обновлен"}
@app.delete("/api/users/{username}")
async def delete_user(username: str, user: dict = Depends(get_current_user)):
if user["role"] != "admin":
raise HTTPException(403, "Доступ запрещен")
if username == user["username"]:
raise HTTPException(400, "Нельзя удалить самого себя")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
del users[username]
save_users(users)
return {"message": "Пользователь удален"}
@app.put("/api/users/{username}/role")
async def update_user_role(username: str, data: dict, user: dict = Depends(get_current_user)):
if user["role"] != "admin":
raise HTTPException(403, "Доступ запрещен")
if username == user["username"]:
raise HTTPException(400, "Нельзя изменить свою роль")
users = load_users()
if username not in users:
raise HTTPException(404, "Пользователь не найден")
new_role = data.get("role")
if new_role not in ["admin", "user", "support", "banned"]:
raise HTTPException(400, "Неверная роль")
users[username]["role"] = new_role
save_users(users)
return {"message": "Роль обновлена"}
# API для личного кабинета
@app.put("/api/profile/username")
async def update_username(data: dict, user: dict = Depends(get_current_user)):
"""Изменить имя пользователя"""
new_username = data.get("new_username", "").strip()
password = data.get("password", "")
if not new_username:
raise HTTPException(400, "Имя пользователя не может быть пустым")
if len(new_username) < 3:
raise HTTPException(400, "Имя пользователя должно быть не менее 3 символов")
users = load_users()
# Проверяем пароль
if not verify_password(password, users[user["username"]]["password"]):
raise HTTPException(400, "Неверный пароль")
# Проверяем, не занято ли новое имя
if new_username in users and new_username != user["username"]:
raise HTTPException(400, "Это имя пользователя уже занято")
# Сохраняем данные пользователя
old_username = user["username"]
user_data = users[old_username]
# Удаляем старую запись и создаём новую
del users[old_username]
user_data["username"] = new_username
users[new_username] = user_data
# Обновляем владельцев серверов
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
config = load_server_config(server_dir.name)
if config.get("owner") == old_username:
config["owner"] = new_username
save_server_config(server_dir.name, config)
# Обновляем доступы к серверам у других пользователей
for username, user_info in users.items():
if "servers" in user_info and old_username in user_info.get("servers", []):
user_info["servers"] = [new_username if s == old_username else s for s in user_info["servers"]]
save_users(users)
# Создаём новый токен
new_token = create_access_token({"sub": new_username, "role": user_data["role"]})
return {
"message": "Имя пользователя изменено",
"access_token": new_token,
"username": new_username
}
@app.put("/api/profile/password")
async def update_password(data: dict, user: dict = Depends(get_current_user)):
"""Изменить пароль"""
old_password = data.get("old_password", "")
new_password = data.get("new_password", "")
if not old_password or not new_password:
raise HTTPException(400, "Заполните все поля")
if len(new_password) < 6:
raise HTTPException(400, "Новый пароль должен быть не менее 6 символов")
users = load_users()
# Проверяем старый пароль
if not verify_password(old_password, users[user["username"]]["password"]):
raise HTTPException(400, "Неверный старый пароль")
# Устанавливаем новый пароль
users[user["username"]]["password"] = get_password_hash(new_password)
save_users(users)
return {"message": "Пароль изменён"}
@app.get("/api/profile/stats")
async def get_profile_stats(user: dict = Depends(get_current_user)):
"""Получить статистику профиля"""
users = load_users()
user_data = users.get(user["username"], {})
# Подсчитываем серверы пользователя
owned_servers = []
accessible_servers = []
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
config = load_server_config(server_dir.name)
if config.get("owner") == user["username"]:
owned_servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name)
})
elif user["username"] in user_data.get("servers", []) or user["role"] == "admin":
accessible_servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name)
})
# Подсчитываем тикеты
tickets = load_tickets()
user_tickets = [t for t in tickets.values() if t["author"] == user["username"]]
tickets_stats = {
"total": len(user_tickets),
"pending": len([t for t in user_tickets if t["status"] == "pending"]),
"in_progress": len([t for t in user_tickets if t["status"] == "in_progress"]),
"closed": len([t for t in user_tickets if t["status"] == "closed"])
}
return {
"username": user["username"],
"role": user["role"],
"owned_servers": owned_servers,
"accessible_servers": accessible_servers,
"tickets": tickets_stats,
"total_servers": len(owned_servers) + len(accessible_servers)
}
# API для серверов
@app.get("/api/servers")
async def get_servers(user: dict = Depends(get_current_user)):
servers = []
try:
for server_dir in SERVERS_DIR.iterdir():
if server_dir.is_dir():
if user["role"] != "admin" and server_dir.name not in user.get("servers", []):
continue
config = load_server_config(server_dir.name)
is_running = False
if server_dir.name in server_processes:
process = server_processes[server_dir.name]
if process.poll() is None:
is_running = True
else:
del server_processes[server_dir.name]
servers.append({
"name": server_dir.name,
"displayName": config.get("displayName", server_dir.name),
"status": "running" if is_running else "stopped"
})
print(f"Найдено серверов для {user['username']}: {len(servers)}")
except Exception as e:
print(f"Ошибка загрузки серверов: {e}")
return servers
@app.post("/api/servers/create")
async def create_server(data: dict, user: dict = Depends(get_current_user)):
server_name = data.get("name", "").strip()
if not server_name or not server_name.replace("_", "").replace("-", "").isalnum():
raise HTTPException(400, "Недопустимое имя сервера")
server_path = SERVERS_DIR / server_name
if server_path.exists():
raise HTTPException(400, "Сервер с таким именем уже существует")
server_path.mkdir(parents=True)
config = {
"name": server_name,
"displayName": data.get("displayName", server_name),
"startCommand": data.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui"),
"owner": user["username"] # Сохраняем владельца
}
save_server_config(server_name, config)
# Если пользователь не админ, автоматически выдаем ему доступ
if user["role"] != "admin":
users = load_users()
if user["username"] in users:
if "servers" not in users[user["username"]]:
users[user["username"]]["servers"] = []
if server_name not in users[user["username"]]["servers"]:
users[user["username"]]["servers"].append(server_name)
save_users(users)
return {"message": "Сервер создан", "name": server_name}
@app.get("/api/servers/{server_name}/config")
async def get_server_config(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
config = load_server_config(server_name)
print(f"Загружена конфигурация для {server_name}: {config}")
return config
@app.put("/api/servers/{server_name}/config")
async def update_server_config(server_name: str, config: dict, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
if server_name in server_processes:
raise HTTPException(400, "Остановите сервер перед изменением настроек")
save_server_config(server_name, config)
return {"message": "Настройки сохранены"}
@app.delete("/api/servers/{server_name}")
async def delete_server(server_name: str, user: dict = Depends(get_current_user)):
if user["role"] != "admin":
raise HTTPException(403, "Только администраторы могут удалять серверы")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
if server_name in server_processes:
raise HTTPException(400, "Остановите сервер перед удалением")
shutil.rmtree(server_path)
return {"message": "Сервер удален"}
# Управление процессами серверов
async def read_server_output(server_name: str, process: subprocess.Popen):
try:
print(f"Начало чтения вывода для сервера {server_name}")
loop = asyncio.get_event_loop()
while True:
if process.poll() is not None:
print(f"Процесс сервера {server_name} завершился с кодом {process.poll()}")
break
try:
line = await loop.run_in_executor(None, process.stdout.readline)
if not line:
break
line = line.strip()
if line:
if server_name not in server_logs:
server_logs[server_name] = []
server_logs[server_name].append(line)
if len(server_logs[server_name]) > 1000:
server_logs[server_name].pop(0)
except Exception as e:
print(f"Ошибка чтения строки для {server_name}: {e}")
await asyncio.sleep(0.1)
except Exception as e:
print(f"Ошибка чтения вывода сервера {server_name}: {e}")
finally:
print(f"Чтение вывода для сервера {server_name} завершено")
if server_name in server_processes and process.poll() is not None:
del server_processes[server_name]
print(f"Сервер {server_name} удален из списка процессов")
@app.post("/api/servers/{server_name}/start")
async def start_server(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
if server_name in server_processes:
raise HTTPException(400, "Сервер уже запущен")
config = load_server_config(server_name)
start_command = config.get("startCommand", "java -Xmx2G -Xms1G -jar server.jar nogui")
cmd_parts = start_command.split()
try:
if IS_WINDOWS:
process = subprocess.Popen(
cmd_parts,
cwd=server_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
process = subprocess.Popen(
cmd_parts,
cwd=server_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
server_processes[server_name] = process
server_logs[server_name] = []
asyncio.create_task(read_server_output(server_name, process))
print(f"Сервер {server_name} запущен с PID {process.pid}")
return {"message": "Сервер запущен", "pid": process.pid}
except Exception as e:
print(f"Ошибка запуска сервера {server_name}: {e}")
raise HTTPException(500, f"Ошибка запуска сервера: {str(e)}")
@app.post("/api/servers/{server_name}/stop")
async def stop_server(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
if server_name not in server_processes:
raise HTTPException(400, "Сервер не запущен")
process = server_processes[server_name]
try:
if process.stdin and not process.stdin.closed:
process.stdin.write("stop\n")
process.stdin.flush()
try:
process.wait(timeout=30)
except subprocess.TimeoutExpired:
print(f"Сервер {server_name} не остановился за 30 секунд, принудительное завершение")
process.kill()
process.wait()
except Exception as e:
print(f"Ошибка при остановке сервера {server_name}: {e}")
try:
process.kill()
process.wait()
except:
pass
finally:
if server_name in server_processes:
del server_processes[server_name]
print(f"Сервер {server_name} остановлен")
return {"message": "Сервер остановлен"}
@app.post("/api/servers/{server_name}/command")
async def send_command(server_name: str, command: dict, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
if server_name not in server_processes:
raise HTTPException(400, "Сервер не запущен")
process = server_processes[server_name]
if process.poll() is not None:
del server_processes[server_name]
raise HTTPException(400, "Сервер не запущен")
try:
cmd = command["command"]
if process.stdin and not process.stdin.closed:
process.stdin.write(cmd + "\n")
process.stdin.flush()
print(f"Команда отправлена серверу {server_name}: {cmd}")
return {"message": "Команда отправлена"}
else:
raise HTTPException(400, "Невозможно отправить команду")
except Exception as e:
print(f"Ошибка отправки команды серверу {server_name}: {e}")
raise HTTPException(500, f"Ошибка отправки команды: {str(e)}")
@app.get("/api/servers/{server_name}/stats")
async def get_server_stats(server_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
try:
disk_usage = sum(f.stat().st_size for f in server_path.rglob('*') if f.is_file())
disk_mb = disk_usage / 1024 / 1024
except:
disk_mb = 0
if server_name not in server_processes:
return {
"status": "stopped",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
process = server_processes[server_name]
try:
if process.poll() is not None:
del server_processes[server_name]
return {
"status": "stopped",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
proc = psutil.Process(process.pid)
memory_mb = proc.memory_info().rss / 1024 / 1024
cpu_percent = proc.cpu_percent(interval=0.1)
return {
"status": "running",
"cpu": round(cpu_percent, 2),
"memory": round(memory_mb, 2),
"disk": round(disk_mb, 2)
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
if server_name in server_processes:
del server_processes[server_name]
return {
"status": "stopped",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
except Exception as e:
print(f"Ошибка получения статистики для {server_name}: {e}")
return {
"status": "unknown",
"cpu": 0,
"memory": 0,
"disk": round(disk_mb, 2)
}
@app.websocket("/ws/servers/{server_name}/console")
async def console_websocket(websocket: WebSocket, server_name: str):
await websocket.accept()
print(f"WebSocket подключен для сервера: {server_name}")
if server_name in server_logs:
print(f"Отправка {len(server_logs[server_name])} существующих логов")
for log in server_logs[server_name]:
await websocket.send_text(log)
else:
print(f"Логов для сервера {server_name} пока нет")
await websocket.send_text(f"[Панель] Ожидание логов от сервера {server_name}...")
last_sent_index = len(server_logs.get(server_name, []))
try:
while True:
if server_name in server_logs:
current_logs = server_logs[server_name]
if len(current_logs) > last_sent_index:
for log in current_logs[last_sent_index:]:
await websocket.send_text(log)
last_sent_index = len(current_logs)
await asyncio.sleep(0.1)
except Exception as e:
print(f"WebSocket ошибка: {e}")
pass
# API для файлов
@app.get("/api/servers/{server_name}/files")
async def list_files(server_name: str, path: str = "", user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
if not server_path.exists():
raise HTTPException(404, "Сервер не найден")
target_path = server_path / path if path else server_path
try:
target_path = target_path.resolve()
server_path = server_path.resolve()
if not str(target_path).startswith(str(server_path)):
raise HTTPException(403, "Доступ запрещен")
except:
raise HTTPException(404, "Путь не найден")
if not target_path.exists():
raise HTTPException(404, "Путь не найден")
if not target_path.is_dir():
raise HTTPException(400, "Путь не является директорией")
files = []
try:
for item in target_path.iterdir():
files.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else 0
})
except Exception as e:
print(f"Ошибка чтения директории: {e}")
raise HTTPException(500, f"Ошибка чтения директории: {str(e)}")
return files
@app.get("/api/servers/{server_name}/files/download")
async def download_file(server_name: str, path: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
file_path = server_path / path
if not file_path.exists() or not str(file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
return FileResponse(file_path, filename=file_path.name)
@app.post("/api/servers/{server_name}/files/upload")
async def upload_file(server_name: str, path: str, file: UploadFile = File(...), user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
target_path = server_path / path / file.filename
if not str(target_path).startswith(str(server_path)):
raise HTTPException(400, "Недопустимый путь")
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "wb") as f:
content = await file.read()
f.write(content)
return {"message": "Файл загружен"}
@app.delete("/api/servers/{server_name}/files")
async def delete_file(server_name: str, path: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
target_path = server_path / path
if not target_path.exists() or not str(target_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
if target_path.is_dir():
shutil.rmtree(target_path)
else:
target_path.unlink()
return {"message": "Файл удален"}
@app.get("/api/servers/{server_name}/files/content")
async def get_file_content(server_name: str, path: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
file_path = server_path / path
if not file_path.exists() or not file_path.is_file() or not str(file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {"content": content}
except UnicodeDecodeError:
raise HTTPException(400, "Файл не является текстовым")
@app.put("/api/servers/{server_name}/files/content")
async def update_file_content(server_name: str, path: str, data: dict, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
file_path = server_path / path
if not file_path.exists() or not file_path.is_file() or not str(file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(data.get("content", ""))
return {"message": "Файл сохранен"}
except Exception as e:
raise HTTPException(400, f"Ошибка сохранения файла: {str(e)}")
@app.put("/api/servers/{server_name}/files/rename")
async def rename_file(server_name: str, old_path: str, new_name: str, user: dict = Depends(get_current_user)):
if not check_server_access(user, server_name):
raise HTTPException(403, "Нет доступа к этому серверу")
server_path = SERVERS_DIR / server_name
old_file_path = server_path / old_path
if not old_file_path.exists() or not str(old_file_path).startswith(str(server_path)):
raise HTTPException(404, "Файл не найден")
new_file_path = old_file_path.parent / new_name
if new_file_path.exists():
raise HTTPException(400, "Файл с таким именем уже существует")
if not str(new_file_path).startswith(str(server_path)):
raise HTTPException(400, "Недопустимое имя файла")
old_file_path.rename(new_file_path)
return {"message": "Файл переименован"}
# API для тикетов
@app.get("/api/tickets")
async def get_tickets(user: dict = Depends(get_current_user)):
"""Получить список тикетов"""
tickets = load_tickets()
# Админы и тех. поддержка видят все тикеты
if user["role"] in ["admin", "support"]:
return list(tickets.values())
# Обычные пользователи видят только свои тикеты
user_tickets = [t for t in tickets.values() if t["author"] == user["username"]]
return user_tickets
@app.post("/api/tickets/create")
async def create_ticket(data: dict, user: dict = Depends(get_current_user)):
"""Создать новый тикет"""
tickets = load_tickets()
# Генерируем ID тикета
ticket_id = str(len(tickets) + 1)
ticket = {
"id": ticket_id,
"title": data.get("title", "").strip(),
"description": data.get("description", "").strip(),
"author": user["username"],
"status": "pending", # pending, in_progress, closed
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"messages": [
{
"author": user["username"],
"text": data.get("description", "").strip(),
"timestamp": datetime.utcnow().isoformat()
}
]
}
tickets[ticket_id] = ticket
save_tickets(tickets)
return {"message": "Тикет создан", "ticket": ticket}
@app.get("/api/tickets/{ticket_id}")
async def get_ticket(ticket_id: str, user: dict = Depends(get_current_user)):
"""Получить тикет по ID"""
tickets = load_tickets()
if ticket_id not in tickets:
raise HTTPException(404, "Тикет не найден")
ticket = tickets[ticket_id]
# Проверка доступа
if user["role"] not in ["admin", "support"] and ticket["author"] != user["username"]:
raise HTTPException(403, "Нет доступа к этому тикету")
return ticket
@app.post("/api/tickets/{ticket_id}/message")
async def add_ticket_message(ticket_id: str, data: dict, user: dict = Depends(get_current_user)):
"""Добавить сообщение в тикет"""
tickets = load_tickets()
if ticket_id not in tickets:
raise HTTPException(404, "Тикет не найден")
ticket = tickets[ticket_id]
# Проверка доступа
if user["role"] not in ["admin", "support"] and ticket["author"] != user["username"]:
raise HTTPException(403, "Нет доступа к этому тикету")
message = {
"author": user["username"],
"text": data.get("text", "").strip(),
"timestamp": datetime.utcnow().isoformat()
}
ticket["messages"].append(message)
ticket["updated_at"] = datetime.utcnow().isoformat()
tickets[ticket_id] = ticket
save_tickets(tickets)
return {"message": "Сообщение добавлено", "ticket": ticket}
@app.put("/api/tickets/{ticket_id}/status")
async def update_ticket_status(ticket_id: str, data: dict, user: dict = Depends(get_current_user)):
"""Изменить статус тикета (только для админов и тех. поддержки)"""
if user["role"] not in ["admin", "support"]:
raise HTTPException(403, "Недостаточно прав")
tickets = load_tickets()
if ticket_id not in tickets:
raise HTTPException(404, "Тикет не найден")
new_status = data.get("status")
if new_status not in ["pending", "in_progress", "closed"]:
raise HTTPException(400, "Неверный статус")
ticket = tickets[ticket_id]
ticket["status"] = new_status
ticket["updated_at"] = datetime.utcnow().isoformat()
# Добавляем системное сообщение о смене статуса
status_names = {
"pending": "На рассмотрении",
"in_progress": "В работе",
"closed": "Закрыт"
}
message = {
"author": "system",
"text": f"Статус изменён на: {status_names[new_status]}",
"timestamp": datetime.utcnow().isoformat()
}
ticket["messages"].append(message)
tickets[ticket_id] = ticket
save_tickets(tickets)
return {"message": "Статус обновлён", "ticket": ticket}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)