refactor(git): 使用shlex安全执行git命令并移除重复代码

修复图表工具提示内容的安全检查
This commit is contained in:
a2893005741 2026-05-11 09:36:06 +08:00
parent d004f04b00
commit 53e4e3cb3c
3 changed files with 70 additions and 104 deletions

View File

@ -1,5 +1,6 @@
import configparser
import os
import shlex
import shutil
from deploy.Windows.config import DeployConfig
@ -8,6 +9,10 @@ from deploy.Windows.utils import cached_property
from deploy.git_over_cdn.client import GitOverCdnClient
def _cmd(*args):
return ' '.join(shlex.quote(str(arg)) for arg in args)
class GitConfigParser(configparser.ConfigParser):
def check(self, section, option, value):
result = self.get(section, option, fallback=None)
@ -78,7 +83,7 @@ class GitManager(DeployConfig):
logger.warning(f'.git/HEAD is unreadable: {e}')
return False
if not self.execute(f'"{self.git}" status', allow_failure=True, output=False):
if not self.execute(_cmd(self.git, 'status'), allow_failure=True, output=False):
logger.warning('git status failed, repository may be corrupted')
return False
@ -101,13 +106,11 @@ class GitManager(DeployConfig):
raise
logger.info(f'Initializing repository: {repo} branch: {branch}')
self.execute(f'"{self.git}" init')
# Check if remote exists before adding
self.execute(f'"{self.git}" remote add "{source}" "{repo}"', allow_failure=True)
# Set remote URL just in case it already exists
self.execute(f'"{self.git}" remote set-url "{source}" "{repo}"')
self.execute(f'"{self.git}" fetch "{source}" "{branch}"')
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
self.execute(_cmd(self.git, 'init'))
self.execute(_cmd(self.git, 'remote', 'add', source, repo), allow_failure=True)
self.execute(_cmd(self.git, 'remote', 'set-url', source, repo))
self.execute(_cmd(self.git, 'fetch', source, branch))
self.execute(_cmd(self.git, 'reset', '--hard', f'{source}/{branch}'))
def git_repository_init(
self, repo, source='origin', branch='master',
@ -117,46 +120,45 @@ class GitManager(DeployConfig):
self.git_repository_repair(repo, source=source, branch=branch)
logger.hr('Git Init', 1)
if not self.execute(f'"{self.git}" init', allow_failure=True):
if not self.execute(_cmd(self.git, 'init'), allow_failure=True):
self.remove('./.git/config')
self.remove('./.git/index')
self.remove('./.git/HEAD')
self.remove('./.git/ORIG_HEAD')
self.execute(f'"{self.git}" init')
self.execute(_cmd(self.git, 'init'))
Progress.GitInit()
logger.hr('Set Git Proxy', 1)
if proxy:
if not self.git_config.check('http', 'proxy', value=proxy):
self.execute(f'"{self.git}" config --local http.proxy "{proxy}"')
self.execute(_cmd(self.git, 'config', '--local', 'http.proxy', proxy))
if not self.git_config.check('https', 'proxy', value=proxy):
self.execute(f'"{self.git}" config --local https.proxy "{proxy}"')
self.execute(_cmd(self.git, 'config', '--local', 'https.proxy', proxy))
else:
if not self.git_config.check('http', 'proxy', value=None):
self.execute(f'"{self.git}" config --local --unset http.proxy', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', '--unset', 'http.proxy'), allow_failure=True)
if not self.git_config.check('https', 'proxy', value=None):
self.execute(f'"{self.git}" config --local --unset https.proxy', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', '--unset', 'https.proxy'), allow_failure=True)
if ssl_verify:
if not self.git_config.check('http', 'sslVerify', value='true'):
self.execute(f'"{self.git}" config --local http.sslVerify true', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', 'http.sslVerify', 'true'), allow_failure=True)
else:
if not self.git_config.check('http', 'sslVerify', value='false'):
self.execute(f'"{self.git}" config --local http.sslVerify false', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', 'http.sslVerify', 'false'), allow_failure=True)
Progress.GitSetConfig()
logger.hr('Set Git Repository', 1)
if not self.git_config.check(f'remote "{source}"', 'url', value=repo):
if not self.execute(f'"{self.git}" remote set-url "{source}" "{repo}"', allow_failure=True):
self.execute(f'"{self.git}" remote add "{source}" "{repo}"')
if not self.execute(_cmd(self.git, 'remote', 'set-url', source, repo), allow_failure=True):
self.execute(_cmd(self.git, 'remote', 'add', source, repo))
Progress.GitSetRepo()
logger.hr('Fetch Repository Branch', 1)
self.execute(f'"{self.git}" fetch "{source}" "{branch}"')
self.execute(_cmd(self.git, 'fetch', source, branch))
Progress.GitFetch()
logger.hr('Pull Repository Branch', 1)
# Remove git lock
for lock_file in [
'./.git/index.lock',
'./.git/HEAD.lock',
@ -166,27 +168,25 @@ class GitManager(DeployConfig):
logger.info(f'Lock file {lock_file} exists, removing')
os.remove(lock_file)
if keep_changes:
if self.execute(f'"{self.git}" stash', allow_failure=True):
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
if self.execute(f'"{self.git}" stash pop', allow_failure=True):
if self.execute(_cmd(self.git, 'stash'), allow_failure=True):
self.execute(_cmd(self.git, 'pull', '--ff-only', source, branch))
if self.execute(_cmd(self.git, 'stash', 'pop'), allow_failure=True):
pass
else:
# No local changes to existing files, untracked files not included
logger.info('Stash pop failed, there seems to be no local changes, skip instead')
else:
logger.info('Stash failed, this may be the first installation, drop changes instead')
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
self.execute(_cmd(self.git, 'reset', '--hard', f'{source}/{branch}'))
self.execute(_cmd(self.git, 'pull', '--ff-only', source, branch))
else:
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
self.execute(_cmd(self.git, 'reset', '--hard', f'{source}/{branch}'))
Progress.GitReset()
# Since `git fetch` is already called, checkout is faster
if not self.execute(f'"{self.git}" checkout "{branch}"', allow_failure=True):
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
if not self.execute(_cmd(self.git, 'checkout', branch), allow_failure=True):
self.execute(_cmd(self.git, 'pull', '--ff-only', source, branch))
Progress.GitCheckout()
logger.hr('Show Version', 1)
self.execute(f'"{self.git}" --no-pager log --no-merges -1')
self.execute(_cmd(self.git, '--no-pager', 'log', '--no-merges', '-1'))
Progress.GitShowVersion()
@property

View File

@ -1,3 +1,4 @@
import shlex
import shutil
from deploy.config import DeployConfig
@ -6,6 +7,10 @@ from deploy.logger import logger
from deploy.utils import *
def _cmd(*args):
return ' '.join(shlex.quote(str(arg)) for arg in args)
class GitManager(DeployConfig):
@cached_property
def git(self):
@ -50,7 +55,7 @@ class GitManager(DeployConfig):
logger.warning(f'.git/HEAD is unreadable: {e}')
return False
if not self.execute(f'"{self.git}" status', allow_failure=True, output=False):
if not self.execute(_cmd(self.git, 'status'), allow_failure=True, output=False):
logger.warning('git status failed, repository may be corrupted')
return False
@ -73,13 +78,11 @@ class GitManager(DeployConfig):
raise
logger.info(f'Initializing repository: {repo} branch: {branch}')
self.execute(f'"{self.git}" init')
# Check if remote exists before adding
self.execute(f'"{self.git}" remote add "{source}" "{repo}"', allow_failure=True)
# Set remote URL just in case it already exists
self.execute(f'"{self.git}" remote set-url "{source}" "{repo}"')
self.execute(f'"{self.git}" fetch "{source}" "{branch}"')
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
self.execute(_cmd(self.git, 'init'))
self.execute(_cmd(self.git, 'remote', 'add', source, repo), allow_failure=True)
self.execute(_cmd(self.git, 'remote', 'set-url', source, repo))
self.execute(_cmd(self.git, 'fetch', source, branch))
self.execute(_cmd(self.git, 'reset', '--hard', f'{source}/{branch}'))
def git_repository_init(
self, repo, source='origin', branch='master',
@ -89,34 +92,33 @@ class GitManager(DeployConfig):
self.git_repository_repair(repo, source=source, branch=branch)
logger.hr('Git Init', 1)
if not self.execute(f'"{self.git}" init', allow_failure=True):
if not self.execute(_cmd(self.git, 'init'), allow_failure=True):
self.remove('./.git/config')
self.remove('./.git/index')
self.remove('./.git/HEAD')
self.execute(f'"{self.git}" init')
self.execute(_cmd(self.git, 'init'))
logger.hr('Set Git Proxy', 1)
if proxy:
self.execute(f'"{self.git}" config --local http.proxy "{proxy}"')
self.execute(f'"{self.git}" config --local https.proxy "{proxy}"')
self.execute(_cmd(self.git, 'config', '--local', 'http.proxy', proxy))
self.execute(_cmd(self.git, 'config', '--local', 'https.proxy', proxy))
else:
self.execute(f'"{self.git}" config --local --unset http.proxy', allow_failure=True)
self.execute(f'"{self.git}" config --local --unset https.proxy', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', '--unset', 'http.proxy'), allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', '--unset', 'https.proxy'), allow_failure=True)
if ssl_verify:
self.execute(f'"{self.git}" config --local http.sslVerify true', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', 'http.sslVerify', 'true'), allow_failure=True)
else:
self.execute(f'"{self.git}" config --local http.sslVerify false', allow_failure=True)
self.execute(_cmd(self.git, 'config', '--local', 'http.sslVerify', 'false'), allow_failure=True)
logger.hr('Set Git Repository', 1)
if not self.execute(f'"{self.git}" remote set-url "{source}" "{repo}"', allow_failure=True):
self.execute(f'"{self.git}" remote add "{source}" "{repo}"')
if not self.execute(_cmd(self.git, 'remote', 'set-url', source, repo), allow_failure=True):
self.execute(_cmd(self.git, 'remote', 'add', source, repo))
logger.hr('Fetch Repository Branch', 1)
self.execute(f'"{self.git}" fetch "{source}" "{branch}"')
self.execute(_cmd(self.git, 'fetch', source, branch))
logger.hr('Pull Repository Branch', 1)
# Remove git lock
for lock_file in [
'./.git/index.lock',
'./.git/HEAD.lock',
@ -126,23 +128,22 @@ class GitManager(DeployConfig):
logger.info(f'Lock file {lock_file} exists, removing')
os.remove(lock_file)
if keep_changes:
if self.execute(f'"{self.git}" stash', allow_failure=True):
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
if self.execute(f'"{self.git}" stash pop', allow_failure=True):
if self.execute(_cmd(self.git, 'stash'), allow_failure=True):
self.execute(_cmd(self.git, 'pull', '--ff-only', source, branch))
if self.execute(_cmd(self.git, 'stash', 'pop'), allow_failure=True):
pass
else:
# No local changes to existing files, untracked files not included
logger.info('Stash pop failed, there seems to be no local changes, skip instead')
else:
logger.info('Stash failed, this may be the first installation, drop changes instead')
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
self.execute(_cmd(self.git, 'reset', '--hard', f'{source}/{branch}'))
self.execute(_cmd(self.git, 'pull', '--ff-only', source, branch))
else:
self.execute(f'"{self.git}" reset --hard "{source}/{branch}"')
self.execute(f'"{self.git}" pull --ff-only "{source}" "{branch}"')
self.execute(_cmd(self.git, 'reset', '--hard', f'{source}/{branch}'))
self.execute(_cmd(self.git, 'pull', '--ff-only', source, branch))
logger.hr('Show Version', 1)
self.execute(f'"{self.git}" --no-pager log --no-merges -1')
self.execute(_cmd(self.git, '--no-pager', 'log', '--no-merges', '-1'))
@property
def goc_client(self):

View File

@ -1,21 +1,24 @@
(function() {
function setTooltipContent(tipEl, rows) {
if (!tipEl || !Array.isArray(rows)) return;
while (tipEl.firstChild) {
tipEl.removeChild(tipEl.firstChild);
}
rows.forEach(function(row) {
if (!row) return;
var div = document.createElement('div');
if (row.style) {
if (row.style && typeof row.style === 'object') {
Object.keys(row.style).forEach(function(k) {
div.style[k] = row.style[k];
});
}
if (row.parts) {
if (row.parts && Array.isArray(row.parts)) {
row.parts.forEach(function(part) {
if (!part) return;
if (part.type === 'text') {
var span = document.createElement('span');
span.textContent = part.value;
if (part.style) {
span.textContent = part.value != null ? String(part.value) : '';
if (part.style && typeof part.style === 'object') {
Object.keys(part.style).forEach(function(k) {
span.style[k] = part.style[k];
});
@ -23,8 +26,8 @@
div.appendChild(span);
} else if (part.type === 'bold') {
var b = document.createElement('b');
b.textContent = part.value;
if (part.style) {
b.textContent = part.value != null ? String(part.value) : '';
if (part.style && typeof part.style === 'object') {
Object.keys(part.style).forEach(function(k) {
b.style[k] = part.style[k];
});
@ -357,44 +360,6 @@
})();
(function() {
function setTooltipContent(tipEl, rows) {
while (tipEl.firstChild) {
tipEl.removeChild(tipEl.firstChild);
}
rows.forEach(function(row) {
var div = document.createElement('div');
if (row.style) {
Object.keys(row.style).forEach(function(k) {
div.style[k] = row.style[k];
});
}
if (row.parts) {
row.parts.forEach(function(part) {
if (part.type === 'text') {
var span = document.createElement('span');
span.textContent = part.value;
if (part.style) {
Object.keys(part.style).forEach(function(k) {
span.style[k] = part.style[k];
});
}
div.appendChild(span);
} else if (part.type === 'bold') {
var b = document.createElement('b');
b.textContent = part.value;
if (part.style) {
Object.keys(part.style).forEach(function(k) {
b.style[k] = part.style[k];
});
}
div.appendChild(b);
}
});
}
tipEl.appendChild(div);
});
}
var detailLabels = __DETAIL_LABELS__;
var detailAp = __DETAIL_AP__;
var detailSources = __DETAIL_SOURCES__;