mirror of
https://github.com/wess09/AzurLaneAutoScript.git
synced 2026-05-14 06:58:22 +08:00
214 lines
8.1 KiB
Python
214 lines
8.1 KiB
Python
import shutil
|
||
|
||
from deploy.config import DeployConfig
|
||
from deploy.git_over_cdn.client import GitOverCdnClient
|
||
from deploy.logger import logger
|
||
from deploy.utils import *
|
||
|
||
|
||
class GitManager(DeployConfig):
|
||
@cached_property
|
||
def git(self):
|
||
exe = self.filepath('GitExecutable')
|
||
if os.path.exists(exe):
|
||
return exe
|
||
|
||
logger.warning(f'GitExecutable: {exe} does not exist, use `git` instead')
|
||
return 'git'
|
||
|
||
@staticmethod
|
||
def remove(file):
|
||
try:
|
||
os.remove(file)
|
||
logger.info(f'Removed file: {file}')
|
||
except FileNotFoundError:
|
||
logger.info(f'File not found: {file}')
|
||
|
||
def git_repository_check(self):
|
||
"""
|
||
检查 .git 目录是否存在且未损坏。
|
||
|
||
Returns:
|
||
bool: True 表示仓库正常,False 表示缺失或损坏需要修复。
|
||
"""
|
||
if not os.path.isdir('./.git'):
|
||
logger.warning('.git directory does not exist')
|
||
return False
|
||
|
||
head_file = './.git/HEAD'
|
||
if not os.path.exists(head_file):
|
||
logger.warning('.git/HEAD does not exist, repository may be corrupted')
|
||
return False
|
||
|
||
try:
|
||
with open(head_file, 'r', encoding='utf-8') as f:
|
||
content = f.read().strip()
|
||
if not content:
|
||
logger.warning('.git/HEAD is empty, repository may be corrupted')
|
||
return False
|
||
except Exception as e:
|
||
logger.warning(f'.git/HEAD is unreadable: {e}')
|
||
return False
|
||
|
||
if not self.execute(f'"{self.git}" status', allow_failure=True, output=False):
|
||
logger.warning('git status failed, repository may be corrupted')
|
||
return False
|
||
|
||
return True
|
||
|
||
def git_repository_repair(self, repo, source='origin', branch='master'):
|
||
"""
|
||
.git 缺失或损坏时,重建 Git 元数据,保留现有工作区文件。
|
||
"""
|
||
logger.hr('Git Repository Repair', 1)
|
||
logger.warning('Attempting to repair git repository metadata without overwriting files')
|
||
|
||
if os.path.isdir('./.git'):
|
||
logger.info('Removing corrupted .git directory')
|
||
try:
|
||
shutil.rmtree('./.git')
|
||
logger.info('Removed .git directory')
|
||
except Exception as e:
|
||
logger.error(f'Failed to remove .git directory: {e}')
|
||
raise
|
||
|
||
logger.info(f'Initializing repository: {repo} branch: {branch}')
|
||
self.execute(f'"{self.git}" init')
|
||
# Check if remote exists before adding
|
||
self.execute(f'"{self.git}" remote add "{source}" "{repo}"', allow_failure=True)
|
||
# Set remote URL just in case it already exists
|
||
self.execute(f'"{self.git}" remote set-url "{source}" "{repo}"')
|
||
self.execute(f'"{self.git}" fetch "{source}" "{branch}"')
|
||
self.execute(f'"{self.git}" reset --mixed "{source}/{branch}"')
|
||
self.execute(f'"{self.git}" branch -M "{branch}"', allow_failure=True)
|
||
|
||
def git_repository_init(
|
||
self, repo, source='origin', branch='master',
|
||
proxy='', ssl_verify=True, keep_changes=False
|
||
):
|
||
repaired = False
|
||
if not self.git_repository_check():
|
||
self.git_repository_repair(repo, source=source, branch=branch)
|
||
repaired = True
|
||
|
||
logger.hr('Git Init', 1)
|
||
if not self.execute(f'"{self.git}" init', allow_failure=True):
|
||
self.remove('./.git/config')
|
||
self.remove('./.git/index')
|
||
self.remove('./.git/HEAD')
|
||
self.execute(f'"{self.git}" init')
|
||
|
||
logger.hr('Set Git Proxy', 1)
|
||
if proxy:
|
||
self.execute(f'"{self.git}" config --local http.proxy "{proxy}"')
|
||
self.execute(f'"{self.git}" config --local https.proxy "{proxy}"')
|
||
else:
|
||
self.execute(f'"{self.git}" config --local --unset http.proxy', allow_failure=True)
|
||
self.execute(f'"{self.git}" config --local --unset https.proxy', allow_failure=True)
|
||
|
||
if ssl_verify:
|
||
self.execute(f'"{self.git}" config --local http.sslVerify true', allow_failure=True)
|
||
else:
|
||
self.execute(f'"{self.git}" config --local http.sslVerify false', allow_failure=True)
|
||
|
||
logger.hr('Set Git Repository', 1)
|
||
if not self.execute(f'"{self.git}" remote set-url "{source}" "{repo}"', allow_failure=True):
|
||
self.execute(f'"{self.git}" remote add "{source}" "{repo}"')
|
||
|
||
logger.hr('Fetch Repository Branch', 1)
|
||
self.execute(f'"{self.git}" fetch "{source}" "{branch}"')
|
||
|
||
logger.hr('Pull Repository Branch', 1)
|
||
# Remove git lock
|
||
for lock_file in [
|
||
'./.git/index.lock',
|
||
'./.git/HEAD.lock',
|
||
'./.git/refs/heads/master.lock',
|
||
]:
|
||
if os.path.exists(lock_file):
|
||
logger.info(f'Lock file {lock_file} exists, removing')
|
||
os.remove(lock_file)
|
||
if repaired:
|
||
logger.info('Repository metadata repaired; skip destructive reset and keep existing files')
|
||
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"', allow_failure=True)
|
||
elif keep_changes:
|
||
if self.execute(f'"{self.git}" stash', allow_failure=True):
|
||
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
|
||
if self.execute(f'"{self.git}" stash pop', allow_failure=True):
|
||
pass
|
||
else:
|
||
# No local changes to existing files, untracked files not included
|
||
logger.info('Stash pop failed, there seems to be no local changes, skip instead')
|
||
else:
|
||
logger.info('Stash failed, this may be the first installation, drop changes instead')
|
||
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
|
||
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
|
||
else:
|
||
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
|
||
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
|
||
|
||
logger.hr('Show Version', 1)
|
||
self.execute(f'"{self.git}" --no-pager log --no-merges -1')
|
||
|
||
@property
|
||
def goc_client(self):
|
||
client = GitOverCdnClient(
|
||
url=[
|
||
'https://vip.123pan.cn/1818706573/pack/LmeSzinc_AzurLaneAutoScript_master',
|
||
'https://1818706573.v.123yx.com/1818706573/pack/LmeSzinc_AzurLaneAutoScript_master',
|
||
],
|
||
folder=self.root_filepath,
|
||
source='origin',
|
||
branch='master',
|
||
git=self.git,
|
||
)
|
||
client.logger = logger
|
||
return client
|
||
|
||
def git_install(self):
|
||
logger.hr('Update Alas', 0)
|
||
|
||
# 检查云端更新端点
|
||
cloud_allow = False
|
||
try:
|
||
import requests
|
||
resp = requests.get("https://alas-apiv2.nanoda.work/api/updata", timeout=5)
|
||
if resp.status_code == 200:
|
||
data = resp.text.strip().lower()
|
||
if data == 'true':
|
||
cloud_allow = True
|
||
elif data == 'false':
|
||
cloud_allow = False
|
||
else:
|
||
try:
|
||
import json
|
||
res = json.loads(data)
|
||
if isinstance(res, bool):
|
||
cloud_allow = res
|
||
except:
|
||
pass
|
||
except Exception as e:
|
||
logger.warning(f"Failed to fetch cloud update flag: {e}")
|
||
|
||
if not cloud_allow:
|
||
logger.info("Cloud update flag is false, skip update")
|
||
return
|
||
|
||
if self.GitOverCdn:
|
||
if self.goc_client.update(keep_changes=self.KeepLocalChanges):
|
||
return
|
||
|
||
self.git_repository_init(
|
||
repo=self.Repository,
|
||
source='origin',
|
||
branch=self.Branch,
|
||
proxy=self.GitProxy,
|
||
ssl_verify=self.SSLVerify,
|
||
keep_changes=self.KeepLocalChanges,
|
||
)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
self = GitManager()
|
||
self.goc_client.get_status()
|