diff --git a/mcp_server_sse.py b/mcp_server_sse.py index f958da849..89251c04e 100644 --- a/mcp_server_sse.py +++ b/mcp_server_sse.py @@ -2,7 +2,7 @@ import os import asyncio import logging import json -from datetime import datetime +import datetime from typing import List, Optional, Dict, Any from starlette.applications import Starlette @@ -13,13 +13,27 @@ from mcp.server.models import InitializationOptions from mcp.server.sse import SseServerTransport from mcp.types import ( TextContent, + ImageContent, Tool, ) +import base64 +import cv2 +import time +import subprocess +import threading +import re +from io import BytesIO from module.config.config import AzurLaneConfig from module.config.utils import alas_instance from module.webui.process_manager import ProcessManager from module.config.mcp_helper import McpConfigHelper +from module.webui.setting import State + +try: + from module.webui.fake_pil_module import remove_fake_pil_module +except ImportError: + remove_fake_pil_module = None # Setup logging logging.basicConfig(level=logging.INFO) @@ -142,6 +156,46 @@ async def list_tools() -> List[Tool]: "required": ["instance"] } ), + Tool( + name="get_screenshot", + description="获取指定实例当前模拟器的画面截图。返回Base64编码。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string"}}, "required": ["instance"]} + ), + Tool( + name="get_current_running_task", + description="精确获取当前实例正在执行的具体子任务(例如:正在打 12-4,正在收发远征,或者正在清退役)。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string"}}, "required": ["instance"]} + ), + Tool( + name="get_scheduler_queue", + description="获取当前正在排队等待执行的任务列表及它们的预计执行时间。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string"}}, "required": ["instance"]} + ), + Tool( + name="trigger_task", + description="强制将某个任务(如 Event, Daily)立刻加入调度队列执行。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string"}, "task": {"type": "string"}}, "required": ["instance", "task"]} + ), + Tool( + name="clear_scheduler_queue", + description="清空当前队列,通常用于卡死或需要紧急终止当前所有计划时。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string"}}, "required": ["instance"]} + ), + Tool( + name="restart_emulator", + description="重启指定实例对应的模拟器进程。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string"}}, "required": ["instance"]} + ), + Tool( + name="restart_adb", + description="重启 ADB 服务,解决设备离线 (Device Offline) 的问题。", + inputSchema={"type": "object", "properties": {"instance": {"type": "string", "description": "可选"}}} + ), + Tool( + name="update_alas", + description="触发 ALAS 的 Git Pull 和依赖更新,让大模型能帮你做日常的程序维护。", + inputSchema={"type": "object", "properties": {}} + ), ] @mcp_server.call_tool() @@ -198,7 +252,6 @@ async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: lines_count = arguments.get("lines", 50) # ALAS 日志命名规则通常是 YYYY-MM-DD_实例名.txt - import datetime date_str = datetime.date.today().strftime("%Y-%m-%d") log_file = f"./log/{date_str}_{inst}.txt" @@ -237,6 +290,162 @@ async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: return [TextContent(type="text", text=f"Error: {inst} is not running.")] manager.stop() return [TextContent(type="text", text=f"Success: Stopped {inst}")] + elif name == "get_screenshot": + inst = arguments["instance"] + if "ALAS_CONFIG_NAME" not in os.environ: + os.environ["ALAS_CONFIG_NAME"] = inst + if remove_fake_pil_module: + remove_fake_pil_module() + + from module.device.device import Device + from PIL import Image + try: + import PIL.JpegImagePlugin + except ImportError: + pass + + try: + config = AzurLaneConfig(inst) + device = Device(config) + image = device.screenshot() + # ALAS uses BGR (OpenCV format), convert to RGB for PIL + image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + image_pil = Image.fromarray(image_rgb) + + buffered = BytesIO() + image_pil.save(buffered, format="JPEG") + img_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + return [ImageContent(type="image", data=img_data, mimeType="image/jpeg")] + except Exception as e: + import traceback + error_msg = f"Error getting screenshot: {str(e)}\n{traceback.format_exc()}" + return [TextContent(type="text", text=error_msg)] + + elif name == "get_current_running_task": + inst = arguments["instance"] + manager = ProcessManager.get_manager(inst) + if not manager.alive: + return [TextContent(type="text", text="Error: Instance is not running.")] + task = "Unknown" + + date_str = datetime.date.today().strftime("%Y-%m-%d") + log_file = f"./log/{date_str}_{inst}.txt" + if not os.path.exists(log_file): + log_file = f"./log/{date_str}_alas.txt" + + if os.path.exists(log_file): + try: + with open(log_file, "r", encoding="utf-8", errors="ignore") as f: + lines = f.readlines() + for line in reversed(lines): + import re + # 适配现代 ALAS 日志格式: 调度器: 开始任务 `TaskName` + m = re.search(r"调度器: 开始任务\s*[`'\" ](.*?)[`'\" ]", line) + if not m: + # 适配旧版或特殊格式: <<< Run task TaskName >>> + m = re.search(r"<<<\s*Run task\s*(.*?)\s*>>>", line) + + if m: + task = m.group(1) + break + except: + pass + return [TextContent(type="text", text=task)] + + elif name == "get_scheduler_queue": + inst = arguments["instance"] + config = AzurLaneConfig(inst) + queue_data = [] + for task_name in config.data: + if task_name in ["Alas", "Error", "MUMU", "MumuPlayer12", "EmulatorManagement", "Dashboard"]: + continue + scheduler = config.data.get(task_name, {}).get("Scheduler", {}) + if scheduler.get("Enable", False): + next_run = scheduler.get("NextRun", "2050-01-01 00:00:00") + queue_data.append({"task": task_name, "next_run": str(next_run)}) + queue_data.sort(key=lambda x: str(x["next_run"])) + return [TextContent(type="text", text=json.dumps(queue_data, ensure_ascii=False, indent=2))] + + elif name == "trigger_task": + inst = arguments["instance"] + task = arguments["task"] + config = AzurLaneConfig(inst) + config.cross_set(f"{task}.Scheduler.Enable", True) + now = datetime.datetime.now() + config.cross_set(f"{task}.Scheduler.NextRun", str(now)) + config.save() + return [TextContent(type="text", text=f"Success: Task {task} scheduled for immediately.")] + + elif name == "clear_scheduler_queue": + inst = arguments["instance"] + config = AzurLaneConfig(inst) + cleared = [] + for task_name in config.data: + scheduler = config.data.get(task_name, {}).get("Scheduler", {}) + if scheduler.get("Enable", False): + config.cross_set(f"{task_name}.Scheduler.Enable", False) + cleared.append(task_name) + if cleared: + config.save() + return [TextContent(type="text", text=f"Success: Cleared tasks: {', '.join(cleared)}")] + + elif name == "restart_emulator": + inst = arguments["instance"] + if "ALAS_CONFIG_NAME" not in os.environ: + os.environ["ALAS_CONFIG_NAME"] = inst + manager = ProcessManager.get_manager(inst) + if remove_fake_pil_module: + remove_fake_pil_module() + + from module.device.device import Device + try: + config = AzurLaneConfig(inst) + device = Device(config) + device.emulator_stop() + time.sleep(60) + device.emulator_start() + return [TextContent(type="text", text=f"Success: Restarted emulator for {inst}")] + except Exception as e: + import traceback + error_msg = f"Error restarting emulator: {str(e)}\n{traceback.format_exc()}" + return [TextContent(type="text", text=error_msg)] + + elif name == "restart_adb": + inst = arguments.get("instance", "alas") + try: + # Try adb from deploy.yaml + adb_path = State.deploy_config.AdbExecutable + if adb_path: + adb_path = adb_path.replace('\\', '/') + + if not adb_path or not os.path.exists(adb_path): + # Fallback to connection_attr logic + adb_search_list = [ + './bin/adb/adb.exe', + './toolkit/Lib/site-packages/adbutils/binaries/adb.exe', + ] + for path in adb_search_list: + if os.path.exists(path): + adb_path = os.path.abspath(path) + break + else: + adb_path = "adb" + + subprocess.run([adb_path, "kill-server"], check=False) + subprocess.run([adb_path, "start-server"], check=False) + return [TextContent(type="text", text=f"Success: Restarted ADB service using {adb_path}.")] + except Exception as e: + return [TextContent(type="text", text=f"Error: {str(e)}")] + + elif name == "update_alas": + try: + from module.webui.updater import updater + def do_update(): + updater.update() + threading.Thread(target=do_update).start() + return [TextContent(type="text", text="Success: Triggered ALAS update in background.")] + except Exception as e: + return [TextContent(type="text", text=f"Error: {str(e)}")] else: return [TextContent(type="text", text=f"Unknown tool: {name}")]