M-AzurLaneAutoScript/module/webui/process_manager.py

282 lines
11 KiB
Python

import argparse
import os
import queue
import threading
from multiprocessing import Process
from typing import Dict, List, Union, Optional
import inflection
from rich.console import Console, ConsoleRenderable, Group
from rich.table import Table
from rich.traceback import Traceback
from rich.segment import Segment
from rich.text import Text
# Since this file does not run under the same process or subprocess of app.py
# the following code needs to be repeated
# Import fake module before import pywebio to avoid importing unnecessary module PIL
from module.webui.fake_pil_module import *
import_fake_pil_module()
from module.logger import logger, set_file_logger, set_func_logger
from module.submodule.submodule import load_mod
from module.submodule.utils import get_available_func, get_available_mod, get_available_mod_func, get_config_mod, \
get_func_mod, list_mod_instance
from module.webui.setting import State
from module.webui.performance_timer import critical_timer, slow_timer, measure
class ProcessManager:
_processes: Dict[str, "ProcessManager"] = {}
def __init__(self, config_name: str = "alas") -> None:
self.config_name = config_name
self._renderable_queue: queue.Queue[ConsoleRenderable] = State.manager.Queue()
self.renderables: List[ConsoleRenderable] = []
self.renderables_max_length = 400
self.renderables_reduce_length = 80
self._process: Process = None
self._process_locks: Dict[str, threading.Lock] = {}
self.thd_log_queue_handler: threading.Thread = None
def start(self, func, ev: threading.Event = None) -> None:
if not self.alive:
if func is None:
func = get_config_mod(self.config_name)
self._process = Process(
target=ProcessManager.run_process,
args=(
self.config_name,
func,
self._renderable_queue,
ev,
),
)
self._process.start()
self.start_log_queue_handler()
def start_log_queue_handler(self):
if (
self.thd_log_queue_handler is not None
and self.thd_log_queue_handler.is_alive()
):
return
self.thd_log_queue_handler = threading.Thread(
target=self._thread_log_queue_handler
)
self.thd_log_queue_handler.start()
def stop(self) -> None:
try:
lock = self._process_locks[self.config_name]
except KeyError:
lock = threading.Lock()
self._process_locks[self.config_name] = lock
with lock:
if self.alive:
self._process.kill()
self.renderables.append(
f"[{self.config_name}] exited. Reason: Manual stop\n"
)
if self.thd_log_queue_handler is not None:
self.thd_log_queue_handler.join(timeout=1)
if self.thd_log_queue_handler.is_alive():
logger.warning(
"Log queue handler thread does not stop within 1 seconds"
)
logger.info(f"[{self.config_name}] exited")
def _thread_log_queue_handler(self) -> None:
while self.alive:
try:
with measure("日志队列获取", threshold_ms=1500):
log = self._renderable_queue.get(timeout=1)
except queue.Empty:
continue
with measure("日志添加和清理", threshold_ms=5):
self.renderables.append(log)
if len(self.renderables) > self.renderables_max_length:
old_count = len(self.renderables)
self.renderables = self.renderables[self.renderables_reduce_length :]
# logger.info(f"日志清理: {old_count} -> {len(self.renderables)} 条")
logger.info("End of log queue handler loop")
@property
def alive(self) -> bool:
if self._process is not None:
with measure(f"[{self.config_name}] Process.is_alive系统调用", threshold_ms=1):
return self._process.is_alive()
else:
return False
@property
def state(self) -> int:
with measure(f"[{self.config_name}] alive检查", threshold_ms=1):
alive_status = self.alive
if alive_status:
return 1
elif len(self.renderables) == 0:
return 2
else:
with measure(f"[{self.config_name}] 获取最后一个renderable", threshold_ms=1):
last_renderable = self.renderables[-1]
with measure(f"[{self.config_name}] 检查 Table 对象", threshold_ms=1):
# 检查是否是 Table 对象,并遍历其列结构
if hasattr(last_renderable, 'columns') and hasattr(last_renderable, 'rows'):
for column in last_renderable.columns:
if hasattr(column, '_cells') and column._cells:
cell_content = column._cells[0] # 第一行(当前日志)
# 检查 Renderables 容器中的 Traceback 对象
if hasattr(cell_content, '_renderables'):
for renderable_item in cell_content._renderables:
# 直接检查类型名称是否包含 Traceback
if 'Traceback' in type(renderable_item).__name__:
return 3
# 检查直接的 Traceback 对象
elif 'Traceback' in str(type(cell_content)):
return 3
# 只有在快速检测都失败时,才使用渲染方式
with measure(f"[{self.config_name}] Console创建和渲染", threshold_ms=5):
with measure(f"[{self.config_name}] Console对象创建", threshold_ms=5):
console = Console(no_color=True)
with measure(f"[{self.config_name}] Console.capture上下文", threshold_ms=5):
with console.capture() as capture:
with measure(f"[{self.config_name}] Console.print执行", threshold_ms=5):
console.print(last_renderable)
with measure(f"[{self.config_name}] 获取渲染结果", threshold_ms=5):
s = capture.get().strip()
if s.endswith("Reason: Manual stop"):
return 2
elif s.endswith("Reason: Finish"):
return 2
elif s.endswith("Reason: Update"):
return 4
else:
return 3
@classmethod
@slow_timer("ProcessManager.get_manager")
def get_manager(cls, config_name: str) -> "ProcessManager":
"""
Create a new alas if not exists.
"""
if config_name not in cls._processes:
with measure(f"创建新ProcessManager({config_name})", threshold_ms=5):
cls._processes[config_name] = ProcessManager(config_name)
# logger.info(f"创建新ProcessManager: {config_name}")
return cls._processes[config_name]
@staticmethod
def run_process(
config_name, func: str, q: queue.Queue, e: threading.Event = None
) -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--electron", action="store_true", help="Runs by electron client."
)
args, _ = parser.parse_known_args()
State.electron = args.electron
# Setup logger
set_file_logger(name=config_name)
if State.electron:
# https://github.com/LmeSzinc/AzurLaneAutoScript/issues/2051
logger.info("Electron detected, remove log output to stdout")
from module.logger import console_hdlr
logger.removeHandler(console_hdlr)
set_func_logger(func=q.put)
from module.config.config import AzurLaneConfig
# Remove fake PIL module, because subprocess will use it
remove_fake_pil_module()
AzurLaneConfig.stop_event = e
try:
# Run alas
if func == "alas":
from alas import AzurLaneAutoScript
if e is not None:
AzurLaneAutoScript.stop_event = e
AzurLaneAutoScript(config_name=config_name).loop()
elif func in get_available_func():
from alas import AzurLaneAutoScript
AzurLaneAutoScript(config_name=config_name).run(inflection.underscore(func), skip_first_screenshot=True)
elif func in get_available_mod():
mod = load_mod(func)
if e is not None:
mod.set_stop_event(e)
mod.loop(config_name)
elif func in get_available_mod_func():
getattr(load_mod(get_func_mod(func)), inflection.underscore(func))(config_name)
else:
logger.critical(f"No function matched: {func}")
logger.info(f"[{config_name}] exited. Reason: Finish\n")
except Exception as e:
logger.exception(e)
@classmethod
def running_instances(cls) -> List["ProcessManager"]:
l = []
for process in cls._processes.values():
if process.alive:
l.append(process)
return l
@staticmethod
def restart_processes(
instances: List[Union["ProcessManager", str]] = None, ev: threading.Event = None
):
"""
After update and reload, or failed to perform an update,
restart all alas that running before update
"""
logger.hr("Restart alas")
# Load MOD_CONFIG_DICT
list_mod_instance()
if instances is None:
instances = []
_instances = set()
for instance in instances:
if isinstance(instance, str):
_instances.add(ProcessManager.get_manager(instance))
elif isinstance(instance, ProcessManager):
_instances.add(instance)
try:
with open("./config/reloadalas", mode="r") as f:
for line in f.readlines():
line = line.strip()
_instances.add(ProcessManager.get_manager(line))
except FileNotFoundError:
pass
for process in _instances:
logger.info(f"Starting [{process.config_name}]")
process.start(func=get_config_mod(process.config_name), ev=ev)
try:
os.remove("./config/reloadalas")
except:
pass
logger.info("Start alas complete")