跳转到内容
Francisco Blog
返回

服务器自动配置代理

编辑页面

快速配置服务器代理

为了能够快速完成代理服务在服务器上的配置,用AI糊了个简单的pipeline脚本,为了一键导入在脚本所在路径下增加一个新的txt文件proxy_sources.txt,内容如下:

# 1) 单条链接:vless:// vmess:// ss:// trojan:// hy2:// tuic://
# 2) 订阅链接:http:// 或 https://
# 3) 本地文本文件路径:文件里可放多条 vless/vmess/ss/trojan 链接
# 4) base64 订阅文本(整段)

在其中直接添加即可,或者在运行完安装脚本后使用proxy add命令直接添加,脚本如下:

#!/usr/bin/env python3
import argparse
import importlib
import json
import os
import platform
import re
import shutil
import signal
import subprocess
import sys
import textwrap
import time
import urllib.request
from pathlib import Path

HOME_DIR = Path('/etc/mihomo')
PROVIDER_DIR = HOME_DIR / 'providers'
PROVIDER_FILE = PROVIDER_DIR / 'pool.txt'
CTL_ENV = HOME_DIR / 'ctl.env'
CONFIG_FILE = HOME_DIR / 'config.yaml'
PROFILE_FILE = Path('/etc/profile.d/proxy.sh')
PROXY_BIN = Path('/usr/local/bin/proxy')
MIHOMO_BIN = Path('/usr/local/bin/mihomo')
SYSTEMD_SERVICE = Path('/etc/systemd/system/mihomo.service')
LOG_FILE = HOME_DIR / 'mihomo.log'
PID_FILE = HOME_DIR / 'mihomo.pid'
SERVICE_NAME = 'mihomo'

RELEASE_APIS = [
    'https://gh.llkk.cc/https://api.github.com/repos/MetaCubeX/mihomo/releases/latest',
    'https://ghps.cc/https://api.github.com/repos/MetaCubeX/mihomo/releases/latest',
    'https://gh.sixyin.com/https://api.github.com/repos/MetaCubeX/mihomo/releases/latest',
    'https://gitproxy.click/https://api.github.com/repos/MetaCubeX/mihomo/releases/latest',
    'https://api.github.com/repos/MetaCubeX/mihomo/releases/latest',
]

DOWNLOAD_MIRRORS = [
    'https://gh.llkk.cc/',
    'https://ghps.cc/',
    'https://gh.zwnes.xyz/',
    'https://github.tmby.shop/',
    'https://gitproxy.click/',
    'https://gh.927223.xyz/',
    'https://gh.felicity.ac.cn/',
    'https://gh.sixyin.com/',
    '',
]


def run(cmd, check=True, capture=False, env=None):
    p = subprocess.run(
        cmd,
        text=True,
        stdout=subprocess.PIPE if capture else None,
        stderr=subprocess.STDOUT if capture else None,
        env=env,
    )
    if check and p.returncode != 0:
        out = f"\n{p.stdout}" if capture and p.stdout else ""
        raise RuntimeError(f"command failed ({p.returncode}): {' '.join(cmd)}{out}")
    return p


class Pretty:
    def __init__(self):
        self.step = 0
        self.total = 9

    def title(self, text):
        print(f"\n{'=' * 12} {text} {'=' * 12}")

    def info(self, text):
        print(f"[INFO] {text}")

    def ok(self, text):
        print(f"[ OK ] {text}")

    def warn(self, text):
        print(f"[WARN] {text}")

    def fail(self, text):
        print(f"[FAIL] {text}")

    def next(self, text):
        self.step += 1
        print(f"\n[{self.step}/{self.total}] {text}")


P = Pretty()


def ensure_root():
    if os.geteuid() != 0:
        raise SystemExit('请用 root 或 sudo 运行: sudo python3 proxy_install.py')


TQDM = None


def try_import_tqdm():
    global TQDM
    try:
        TQDM = importlib.import_module('tqdm').tqdm
    except Exception:
        TQDM = None


def apt_install_packages():
    apt = shutil.which('apt-get')
    if not apt:
        P.warn('未检测到 apt-get,跳过依赖安装。')
        try_import_tqdm()
        return
    env = os.environ.copy()
    env['DEBIAN_FRONTEND'] = 'noninteractive'
    P.info('安装基础依赖(ca-certificates, curl, python3-tqdm)...')
    run([apt, 'update'], env=env)
    run([apt, 'install', '-y', 'ca-certificates', 'curl', 'python3-tqdm'], env=env)
    importlib.invalidate_caches()
    try_import_tqdm()
    if TQDM:
        P.ok('tqdm 已可用,后续下载会显示进度条。')
    else:
        P.warn('tqdm 未启用,继续使用普通输出。')


def has_systemd():
    return bool(shutil.which('systemctl')) and os.path.isdir('/run/systemd/system')


def stop_existing_runtime():
    if has_systemd() and SYSTEMD_SERVICE.exists():
        run(['systemctl', 'stop', SERVICE_NAME], check=False)
        run(['systemctl', 'disable', SERVICE_NAME], check=False)
    if PID_FILE.exists():
        try:
            pid = int(PID_FILE.read_text().strip())
            os.kill(pid, signal.SIGTERM)
            for _ in range(30):
                try:
                    os.kill(pid, 0)
                    time.sleep(0.2)
                except OSError:
                    break
            try:
                os.kill(pid, signal.SIGKILL)
            except OSError:
                pass
        except Exception:
            pass
        try:
            PID_FILE.unlink()
        except Exception:
            pass


def cleanup_old_install():
    P.info('清理旧安装痕迹(会保留已安装的 mihomo 二进制)...')
    stop_existing_runtime()
    for path in [PROFILE_FILE, PROXY_BIN, SYSTEMD_SERVICE]:
        try:
            if path.exists() or path.is_symlink():
                path.unlink()
        except Exception:
            pass
    if HOME_DIR.exists():
        shutil.rmtree(HOME_DIR, ignore_errors=True)
    if has_systemd():
        run(['systemctl', 'daemon-reload'], check=False)
    P.ok('旧安装已清理。')


def detect_asset_suffix():
    arch = platform.machine().lower()
    if arch in ('x86_64', 'amd64'):
        return 'linux-amd64-v1'
    if arch in ('aarch64', 'arm64'):
        return 'linux-arm64'
    if arch in ('armv7l', 'armv7'):
        return 'linux-armv7'
    raise RuntimeError(f'不支持的架构: {arch}')


def is_mihomo_installed():
    if not MIHOMO_BIN.exists():
        return False
    p = run([str(MIHOMO_BIN), '-v'], check=False, capture=True)
    return p.returncode == 0


def urlopen_with_retry(req, timeout=30, retries=3, sleep_sec=2):
    last_err = None
    for i in range(retries):
        try:
            return urllib.request.urlopen(req, timeout=timeout)
        except Exception as e:
            last_err = e
            if i < retries - 1:
                P.warn(f'请求失败,第 {i + 1} 次重试: {e}')
                time.sleep(sleep_sec)
    raise last_err


def fetch_latest_release_data():
    headers = {
        'User-Agent': 'proxy-installer/1.0',
        'Accept': 'application/vnd.github+json',
    }

    last_err = None
    for api in RELEASE_APIS:
        try:
            P.info(f'尝试获取发行版信息: {api}')
            req = urllib.request.Request(api, headers=headers)
            with urlopen_with_retry(req, timeout=30, retries=2) as resp:
                data = json.loads(resp.read().decode('utf-8', errors='ignore'))
                if isinstance(data, dict) and data.get('assets'):
                    return data
                raise RuntimeError('返回内容中未找到 assets 字段')
        except Exception as e:
            last_err = e
            P.warn(f'获取失败: {e}')
    raise RuntimeError(f'所有发行版信息源均不可用: {last_err}')


def download_file(url: str, dst: Path):
    headers = {'User-Agent': 'proxy-installer/1.0'}

    candidates = []
    for mirror in DOWNLOAD_MIRRORS:
        if mirror:
            candidates.append(mirror + url)
        else:
            candidates.append(url)

    last_err = None
    for real_url in candidates:
        try:
            P.info(f'尝试下载: {real_url}')
            req = urllib.request.Request(real_url, headers=headers)
            with urlopen_with_retry(req, timeout=60, retries=2) as resp:
                total = int(resp.headers.get('Content-Length', '0') or '0')
                with open(dst, 'wb') as f:
                    if TQDM and total > 0:
                        with TQDM(total=total, unit='B', unit_scale=True, desc='下载 mihomo') as bar:
                            while True:
                                chunk = resp.read(1024 * 128)
                                if not chunk:
                                    break
                                f.write(chunk)
                                bar.update(len(chunk))
                    else:
                        while True:
                            chunk = resp.read(1024 * 128)
                            if not chunk:
                                break
                            f.write(chunk)
                return
        except Exception as e:
            last_err = e
            P.warn(f'下载失败: {e}')
            try:
                if dst.exists():
                    dst.unlink()
            except Exception:
                pass

    raise RuntimeError(f'所有下载镜像均失败: {last_err}')


def install_mihomo_if_needed():
    if is_mihomo_installed():
        P.ok('检测到 mihomo 已安装,跳过重复下载。')
        return

    suffix = detect_asset_suffix()
    P.info('获取最新 mihomo 发行版信息...')
    data = fetch_latest_release_data()

    assets = data.get('assets', [])
    url = None
    for item in assets:
        name = item.get('name', '')
        if suffix in name and name.endswith('.gz'):
            url = item.get('browser_download_url')
            break

    if not url:
        raise RuntimeError(f'未找到适合当前架构的 mihomo 发行包: {suffix}')

    tmp_gz = Path('/tmp/mihomo.gz')
    tmp_bin = Path('/tmp/mihomo')

    P.info(f'已找到发行包: {url}')
    download_file(url, tmp_gz)

    if tmp_bin.exists():
        tmp_bin.unlink()

    run(['gunzip', '-f', str(tmp_gz)])
    run(['install', '-m', '0755', str(tmp_bin), str(MIHOMO_BIN)])
    P.ok(f'mihomo 已安装到 {MIHOMO_BIN}')


def write_config(script_dir: Path):
    HOME_DIR.mkdir(parents=True, exist_ok=True)
    PROVIDER_DIR.mkdir(parents=True, exist_ok=True)
    PROVIDER_FILE.write_text('', encoding='utf-8')
    boot_mode = 'systemd' if has_systemd() else 'direct'
    default_sources = (script_dir / 'proxy_sources.txt').resolve()
    CTL_ENV.write_text(textwrap.dedent(f'''
        MIHOMO_API="http://127.0.0.1:9090"
        MIHOMO_SECRET=""
        MIXED_PORT="7890"
        SOCKS_PORT="7891"
        PROVIDER_NAME="pool"
        PROVIDER_FILE="{PROVIDER_FILE}"
        HOME_DIR="{HOME_DIR}"
        LOG_FILE="{LOG_FILE}"
        PID_FILE="{PID_FILE}"
        MIHOMO_BIN="{MIHOMO_BIN}"
        BOOT_MODE="{boot_mode}"
        DEFAULT_SOURCE_FILE="{default_sources}"
    ''').strip() + '\n', encoding='utf-8')
    CONFIG_FILE.write_text(textwrap.dedent('''
        mixed-port: 7890
        socks-port: 7891
        allow-lan: false
        mode: rule
        log-level: info
        ipv6: false

        external-controller: 127.0.0.1:9090
        secret: ""

        profile:
          store-selected: true
          store-fake-ip: true

        dns:
          enable: true
          listen: 127.0.0.1:1053
          ipv6: false
          enhanced-mode: redir-host
          nameserver:
            - https://1.1.1.1/dns-query
            - https://8.8.8.8/dns-query

        proxy-providers:
          pool:
            type: file
            path: ./providers/pool.txt
            health-check:
              enable: true
              url: https://www.gstatic.com/generate_204
              interval: 300
              timeout: 5000
              lazy: false

        proxy-groups:
          - name: AUTO
            type: url-test
            use:
              - pool
            url: https://www.gstatic.com/generate_204
            interval: 300
            lazy: false
            tolerance: 100

          - name: PROXY
            type: select
            use:
              - pool
            proxies:
              - AUTO
              - DIRECT

        rules:
          - MATCH,PROXY
    ''').lstrip(), encoding='utf-8')
    P.ok('配置文件已生成。')


PROXY_CLI = r'''#!/usr/bin/env python3
import json
import os
import re
import signal
import subprocess
import sys
import time
import base64
import urllib.request
import urllib.parse
from pathlib import Path

ENV_FILE = "/etc/mihomo/ctl.env"
SUPPORTED_SCHEMES = ("vless://", "vmess://", "ss://", "trojan://", "hysteria://", "hy2://", "tuic://")


def load_env():
    env = {}
    with open(ENV_FILE, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#") or "=" not in line:
                continue
            k, v = line.split("=", 1)
            env[k.strip()] = v.strip().strip('"').strip("'")
    return env


CFG = load_env()
API = CFG["MIHOMO_API"].rstrip("/")
SECRET = CFG.get("MIHOMO_SECRET", "")
PROVIDER_NAME = CFG["PROVIDER_NAME"]
PROVIDER_FILE = Path(CFG["PROVIDER_FILE"])
HOME_DIR = Path(CFG["HOME_DIR"])
LOG_FILE = Path(CFG["LOG_FILE"])
PID_FILE = Path(CFG["PID_FILE"])
MIHOMO_BIN = CFG["MIHOMO_BIN"]
BOOT_MODE = CFG.get("BOOT_MODE", "direct")
DEFAULT_SOURCE_FILE = Path(CFG["DEFAULT_SOURCE_FILE"])


def out(msg=""):
    print(msg)


def err(msg):
    print(msg, file=sys.stderr)


def run(cmd, check=True, capture=True):
    p = subprocess.run(cmd, text=True, stdout=subprocess.PIPE if capture else None, stderr=subprocess.STDOUT if capture else None)
    if check and p.returncode != 0:
        tail = f"\n{p.stdout}" if capture and p.stdout else ""
        raise RuntimeError(f"command failed ({p.returncode}): {' '.join(cmd)}{tail}")
    return p


def api_request(path, method="GET", data=None, timeout=10):
    url = API + path
    headers = {"User-Agent": "proxy-cli/1.0"}
    if SECRET:
        headers["Authorization"] = f"Bearer {SECRET}"
    body = None
    if data is not None:
        body = json.dumps(data).encode("utf-8")
        headers["Content-Type"] = "application/json"
    req = urllib.request.Request(url, data=body, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            raw = resp.read()
            if not raw:
                return {}
            return json.loads(raw.decode("utf-8", errors="ignore"))
    except Exception as e:
        return {"_error": str(e)}


def b64_decode_loose(s: str) -> str:
    s = re.sub(r"\s+", "", s)
    pad = len(s) % 4
    if pad:
        s += "=" * (4 - pad)
    try:
        return base64.urlsafe_b64decode(s.encode()).decode("utf-8", errors="ignore")
    except Exception:
        return ""


def looks_like_base64_blob(text: str) -> bool:
    t = re.sub(r"\s+", "", text)
    if "://" in text:
        return False
    return bool(re.fullmatch(r"[A-Za-z0-9+/=_-]+", t))


def parse_lines(text: str):
    return [x.strip() for x in text.splitlines() if x.strip() and not x.strip().startswith("#")]


def extract_uri_lines(text: str):
    text = text.strip()
    if not text:
        return []
    if looks_like_base64_blob(text):
        decoded = b64_decode_loose(text)
        if decoded and "://" in decoded:
            text = decoded
    lines = []
    for line in parse_lines(text):
        if line.startswith(SUPPORTED_SCHEMES):
            lines.append(line)
    if not lines and text.startswith(SUPPORTED_SCHEMES):
        lines = [text]
    return lines


def fetch_text(url: str) -> str:
    req = urllib.request.Request(url, headers={"User-Agent": "proxy-cli/1.0"})
    with urllib.request.urlopen(req, timeout=25) as resp:
        return resp.read().decode("utf-8", errors="ignore")


def node_name_from_uri(uri: str) -> str:
    try:
        if uri.startswith("vmess://"):
            raw = uri[len("vmess://"):]
            js = b64_decode_loose(raw)
            obj = json.loads(js)
            return (obj.get("ps") or "vmess").strip()
        parsed = urllib.parse.urlsplit(uri)
        frag = urllib.parse.unquote(parsed.fragment or "").strip()
        return frag or parsed.scheme
    except Exception:
        return "unknown"


def set_node_name(uri: str, new_name: str) -> str:
    try:
        if uri.startswith("vmess://"):
            raw = uri[len("vmess://"):]
            js = b64_decode_loose(raw)
            obj = json.loads(js)
            obj["ps"] = new_name
            data = json.dumps(obj, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
            return "vmess://" + base64.b64encode(data).decode()
        parsed = urllib.parse.urlsplit(uri)
        return urllib.parse.urlunsplit((parsed.scheme, parsed.netloc, parsed.path, parsed.query, urllib.parse.quote(new_name, safe="")))
    except Exception:
        return uri


def load_pool_lines():
    if not PROVIDER_FILE.exists():
        return []
    return [x.strip() for x in PROVIDER_FILE.read_text(encoding="utf-8", errors="ignore").splitlines() if x.strip()]


def uniquify_names(lines):
    counts = {}
    out = []
    for line in lines:
        base = node_name_from_uri(line) or "node"
        n = counts.get(base, 0) + 1
        counts[base] = n
        name = base if n == 1 else f"{base} #{n}"
        out.append(set_node_name(line, name))
    return out


def dedupe_by_uri(lines):
    seen = set()
    out_lines = []
    for line in lines:
        if line not in seen:
            seen.add(line)
            out_lines.append(line)
    return uniquify_names(out_lines)


def save_pool_lines(lines):
    PROVIDER_FILE.parent.mkdir(parents=True, exist_ok=True)
    lines = dedupe_by_uri(lines)
    PROVIDER_FILE.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")


def resolve_source_to_nodes(source: str, visited=None):
    if visited is None:
        visited = set()
    key = os.path.abspath(source) if os.path.exists(source) else source
    if key in visited:
        return []
    visited.add(key)

    if os.path.exists(source):
        text = Path(source).read_text(encoding="utf-8", errors="ignore")
        nodes = extract_uri_lines(text)
        if nodes:
            return nodes
        merged = []
        for item in parse_lines(text):
            merged.extend(resolve_source_to_nodes(item, visited))
        return merged

    if source.startswith("http://") or source.startswith("https://"):
        text = fetch_text(source)
        nodes = extract_uri_lines(text)
        if nodes:
            return nodes
        merged = []
        for item in parse_lines(text):
            merged.extend(resolve_source_to_nodes(item, visited))
        return merged

    nodes = extract_uri_lines(source)
    if nodes:
        return nodes

    if "\n" in source:
        merged = []
        for item in parse_lines(source):
            merged.extend(resolve_source_to_nodes(item, visited))
        return merged
    return []


def pid_running():
    if not PID_FILE.exists():
        return False
    try:
        pid = int(PID_FILE.read_text().strip())
        os.kill(pid, 0)
        return True
    except Exception:
        try:
            PID_FILE.unlink()
        except Exception:
            pass
        return False


def wait_api(timeout=10):
    end = time.time() + timeout
    while time.time() < end:
        ver = api_request("/version", timeout=2)
        if not ver.get("_error"):
            return True
        time.sleep(0.5)
    return False


def start_core():
    if BOOT_MODE == "systemd":
        p = run(["systemctl", "start", "mihomo"], check=False)
        if p.returncode != 0:
            err("启动失败,请检查 systemd 日志。")
            return 1
    else:
        if pid_running():
            out("mihomo 已在运行。")
            return 0
        HOME_DIR.mkdir(parents=True, exist_ok=True)
        with open(LOG_FILE, "ab") as logf:
            proc = subprocess.Popen([MIHOMO_BIN, "-d", str(HOME_DIR)], stdout=logf, stderr=subprocess.STDOUT, start_new_session=True)
        PID_FILE.write_text(str(proc.pid), encoding="utf-8")
    if wait_api():
        out("代理核心已启动。")
        return 0
    err("代理核心启动超时,请查看日志。")
    return 1


def stop_core():
    if BOOT_MODE == "systemd":
        run(["systemctl", "stop", "mihomo"], check=False)
        out("代理核心已停止。")
        return 0
    if not pid_running():
        out("mihomo 未运行。")
        return 0
    try:
        pid = int(PID_FILE.read_text().strip())
        os.kill(pid, signal.SIGTERM)
        for _ in range(30):
            try:
                os.kill(pid, 0)
                time.sleep(0.2)
            except OSError:
                break
        try:
            os.kill(pid, signal.SIGKILL)
        except OSError:
            pass
    except Exception:
        pass
    try:
        PID_FILE.unlink()
    except Exception:
        pass
    out("代理核心已停止。")
    return 0


def refresh_provider():
    res = api_request(f"/providers/proxies/{PROVIDER_NAME}", method="PUT", timeout=15)
    return "_error" not in res


def healthcheck_provider():
    res = api_request(f"/providers/proxies/{PROVIDER_NAME}/healthcheck", method="GET", timeout=30)
    return "_error" not in res


def get_provider_data():
    return api_request(f"/providers/proxies/{PROVIDER_NAME}", method="GET", timeout=15)


def get_proxy_group():
    return api_request("/proxies/PROXY", method="GET", timeout=10)


def get_version():
    return api_request("/version", method="GET", timeout=5)


def set_current_proxy(name: str):
    res = api_request("/proxies/PROXY", method="PUT", data={"name": name}, timeout=10)
    return "_error" not in res


def parse_provider_proxies(data):
    proxies = data.get("proxies", [])
    if isinstance(proxies, dict):
        proxies = list(proxies.values())
    out_items = []
    for p in proxies:
        if not isinstance(p, dict):
            continue
        name = p.get("name") or ""
        alive = p.get("alive")
        history = p.get("history") or []
        delay = None
        for h in reversed(history):
            if isinstance(h, dict) and isinstance(h.get("delay"), int) and h.get("delay", 0) > 0:
                delay = h["delay"]
                break
        out_items.append({"name": name, "alive": alive, "delay": delay})
    return out_items


def status_icon(item):
    if item.get("delay") is not None:
        return "OK"
    if item.get("alive") is False:
        return "XX"
    return "--"


def local_items():
    items = []
    for idx, line in enumerate(load_pool_lines(), 1):
        items.append({"idx": idx, "name": node_name_from_uri(line), "delay": None, "alive": None, "line": line})
    return items


def remote_items():
    pdata = get_provider_data()
    if pdata.get("_error"):
        return None
    items = []
    for idx, p in enumerate(parse_provider_proxies(pdata), 1):
        p["idx"] = idx
        items.append(p)
    return items


def current_selected_name():
    group = get_proxy_group()
    if group.get("_error"):
        return None
    return group.get("now") or group.get("name")


def show_list(items, current=None):
    for item in items:
        mark = "*" if current and item["name"] == current else " "
        delay = f"{item['delay']} ms" if item.get("delay") is not None else "-"
        out(f"[{item['idx']:>3}] {mark} {status_icon(item):>2}  {delay:>8}  {item['name']}")


def ensure_started():
    ver = get_version()
    return "_error" not in ver


def cmd_help():
    out("""proxy 使用说明:
  proxy on                      启动代理核心,并为当前 shell 打开代理环境变量
  proxy off                     关闭代理环境变量,并停止代理核心
  proxy add                     从默认 proxy_sources.txt 同步导入
  proxy add <来源>              导入订阅链接 / 单节点 URI / 本地 txt 文件
  proxy list                    列出所有节点并显示编号
  proxy select                  交互式选择节点
  proxy select <编号|名称|auto> 选择指定节点,或切回 AUTO
  proxy check                   测试全部节点连通性并显示结果
  proxy fastest                 测试全部节点并切换到最快节点
  proxy status                  查看运行状态、当前节点和数量统计
  proxy remove fail             测试全部节点并删除失效节点
  proxy remove <编号>           删除指定编号的节点
  proxy help                    显示本帮助

说明:
  1. 第一次使用前,请先执行: source /etc/profile.d/proxy.sh
  2. 默认来源文件: {default_source}
  3. check / fastest / remove fail 依赖 mihomo 已启动
""".format(default_source=DEFAULT_SOURCE_FILE))
    return 0


def cmd_on():
    rc = start_core()
    if rc == 0:
        out("如需在当前 shell 生效代理环境变量,请先执行: source /etc/profile.d/proxy.sh")
    return rc


def cmd_off():
    return stop_core()


def cmd_add(source=None):
    specs = [source] if source else [str(DEFAULT_SOURCE_FILE)]
    merged_nodes = []
    for spec in specs:
        nodes = resolve_source_to_nodes(spec)
        if not nodes:
            out(f"跳过空来源: {spec}")
            continue
        merged_nodes.extend(nodes)
    if not merged_nodes:
        err("未识别到任何可导入节点。")
        return 1
    old = load_pool_lines()
    save_pool_lines(old + merged_nodes)
    total = len(load_pool_lines())
    out(f"已导入 {len(merged_nodes)} 条节点,当前共 {total} 条。")
    if ensure_started():
        refresh_provider()
        out("已通知 mihomo 刷新 provider。")
    else:
        out("mihomo 当前未运行,节点已写入本地池。")
    return 0


def cmd_status():
    running = ensure_started()
    out(f"运行模式: {BOOT_MODE}")
    out(f"默认来源: {DEFAULT_SOURCE_FILE}")
    out(f"本地节点池: {PROVIDER_FILE}")
    out(f"核心运行: {'是' if running else '否'}")
    if not running:
        out(f"节点总数: {len(load_pool_lines())}")
        return 0
    ver = get_version()
    current = current_selected_name()
    items = remote_items() or []
    alive = sum(1 for x in items if x.get("delay") is not None)
    out(f"mihomo version: {ver.get('version', 'unknown')}")
    out(f"当前节点: {current}")
    out(f"节点总数: {len(items)}  可用: {alive}  疑似失效: {len(items) - alive}")
    top = sorted([x for x in items if x.get("delay") is not None], key=lambda x: x["delay"])[:5]
    if top:
        out("最快节点前 5:")
        show_list(top, current=current)
    return 0


def cmd_list():
    current = current_selected_name() if ensure_started() else None
    items = remote_items() if ensure_started() else None
    if items is None:
        items = local_items()
    if not items:
        out("当前没有节点。")
        return 0
    show_list(items, current=current)
    return 0


def run_healthcheck(wait_seconds=6):
    if not ensure_started():
        err("mihomo 未运行,请先执行 proxy on")
        return None
    ok = healthcheck_provider()
    if not ok:
        err("无法触发健康检查。")
        return None
    time.sleep(wait_seconds)
    items = remote_items()
    if items is None:
        err("读取健康检查结果失败。")
        return None
    return items


def cmd_check():
    items = run_healthcheck()
    if items is None:
        return 1
    current = current_selected_name()
    show_list(items, current=current)
    alive = sum(1 for x in items if x.get("delay") is not None)
    out(f"测试完成:可用 {alive} / 总数 {len(items)}")
    return 0


def choose_fastest(items):
    alive = [x for x in items if x.get("delay") is not None]
    if not alive:
        return None
    return sorted(alive, key=lambda x: x["delay"])[0]


def cmd_fastest():
    items = run_healthcheck()
    if items is None:
        return 1
    best = choose_fastest(items)
    if not best:
        err("没有可用节点。")
        return 1
    if set_current_proxy(best["name"]):
        out(f"已切换到最快节点: {best['name']} ({best['delay']} ms)")
        return 0
    err("切换最快节点失败。")
    return 1


def get_items_for_indexing():
    items = remote_items() if ensure_started() else None
    return items or local_items()


def cmd_select(arg=None):
    if not ensure_started():
        err("mihomo 未运行,请先执行 proxy on")
        return 1
    items = remote_items() or []
    if not items:
        err("没有可选节点。")
        return 1
    if arg is None:
        current = current_selected_name()
        show_list(items, current=current)
        raw = input("输入节点编号(或 auto): ").strip()
        if not raw:
            out("已取消。")
            return 0
        arg = raw
    if str(arg).lower() == 'auto':
        if set_current_proxy('AUTO'):
            out('已切换到 AUTO。')
            return 0
        err('切换到 AUTO 失败。')
        return 1
    target = None
    if str(arg).isdigit():
        idx = int(arg)
        for item in items:
            if item['idx'] == idx:
                target = item['name']
                break
    else:
        target = str(arg)
    if not target:
        err('未找到对应节点。')
        return 1
    if set_current_proxy(target):
        out(f'已切换到节点: {target}')
        return 0
    err('切换节点失败。')
    return 1


def cmd_remove(arg=None):
    if arg is None:
        err('用法: proxy remove fail | proxy remove <编号>')
        return 1
    if str(arg).lower() == 'fail':
        items = run_healthcheck()
        if items is None:
            return 1
        dead_names = {x['name'] for x in items if x.get('delay') is None or x.get('alive') is False}
        if not dead_names:
            out('没有失效节点。')
            return 0
        old = load_pool_lines()
        kept = []
        removed = []
        for line in old:
            name = node_name_from_uri(line)
            if name in dead_names:
                removed.append(name)
            else:
                kept.append(line)
        save_pool_lines(kept)
        if ensure_started():
            refresh_provider()
        out(f'已删除 {len(removed)} 条失效节点;剩余 {len(kept)} 条。')
        for name in removed[:20]:
            out(f'  - {name}')
        return 0

    if not str(arg).isdigit():
        err('删除指定节点时请传编号,例如: proxy remove 3')
        return 1

    idx = int(arg)
    old = load_pool_lines()
    if idx < 1 or idx > len(old):
        err('编号超出范围。')
        return 1
    target_name = node_name_from_uri(old[idx - 1])
    del old[idx - 1]
    save_pool_lines(old)
    if ensure_started():
        refresh_provider()
    out(f'已删除节点: [{idx}] {target_name}')
    return 0


def main(argv=None):
    argv = argv or sys.argv[1:]
    if not argv:
        return cmd_help()
    cmd = argv[0].lower()
    if cmd == 'help':
        return cmd_help()
    if cmd == '_start':
        return start_core()
    if cmd == '_stop':
        return stop_core()
    if cmd == 'on':
        return cmd_on()
    if cmd == 'off':
        return cmd_off()
    if cmd == 'add':
        return cmd_add(argv[1] if len(argv) > 1 else None)
    if cmd == 'status':
        return cmd_status()
    if cmd == 'list':
        return cmd_list()
    if cmd == 'check':
        return cmd_check()
    if cmd == 'fastest':
        return cmd_fastest()
    if cmd == 'select':
        return cmd_select(argv[1] if len(argv) > 1 else None)
    if cmd == 'remove':
        return cmd_remove(argv[1] if len(argv) > 1 else None)
    err(f'未知命令: {cmd}')
    return cmd_help() or 1


if __name__ == '__main__':
    raise SystemExit(main())
'''


def write_proxy_cli():
    PROXY_BIN.write_text(PROXY_CLI, encoding='utf-8')
    PROXY_BIN.chmod(0o755)
    P.ok(f'命令行工具已写入 {PROXY_BIN}')


PROFILE_SCRIPT = r'''[ -f /etc/mihomo/ctl.env ] && . /etc/mihomo/ctl.env

_proxy_set_env() {
  export http_proxy="http://127.0.0.1:${MIXED_PORT}"
  export https_proxy="$http_proxy"
  export HTTP_PROXY="$http_proxy"
  export HTTPS_PROXY="$http_proxy"

  export all_proxy="socks5://127.0.0.1:${SOCKS_PORT}"
  export ALL_PROXY="$all_proxy"

  export no_proxy="localhost,127.0.0.1,::1"
  export NO_PROXY="$no_proxy"
}

_proxy_clear_env() {
  unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
  unset all_proxy ALL_PROXY
  unset no_proxy NO_PROXY
}

proxy() {
  case "$1" in
    on)
      command proxy _start || return $?
      _proxy_set_env
      echo "代理已开启"
      echo "  http_proxy=$http_proxy"
      echo "  all_proxy=$all_proxy"
      ;;
    off)
      _proxy_clear_env
      command proxy _stop
      echo "代理已关闭"
      ;;
    help|-h|--help|"")
      command proxy help
      ;;
    *)
      command proxy "$@"
      ;;
  esac
}
'''


def write_profile_script():
    PROFILE_FILE.write_text(PROFILE_SCRIPT, encoding='utf-8')
    PROFILE_FILE.chmod(0o644)
    P.ok(f'shell 函数已写入 {PROFILE_FILE}')


SERVICE_TEXT = textwrap.dedent('''
    [Unit]
    Description=mihomo proxy daemon
    After=network-online.target
    Wants=network-online.target

    [Service]
    Type=simple
    ExecStart=/usr/local/bin/mihomo -d /etc/mihomo
    Restart=always
    RestartSec=3
    LimitNOFILE=1048576

    [Install]
    WantedBy=multi-user.target
''').lstrip()


def write_service_if_needed():
    if not has_systemd():
        P.warn('当前环境无 systemd,已切换为 direct 后台模式。')
        return
    SYSTEMD_SERVICE.write_text(SERVICE_TEXT, encoding='utf-8')
    run(['systemctl', 'daemon-reload'], check=False)
    run(['systemctl', 'enable', SERVICE_NAME], check=False)
    P.ok('systemd 服务已安装。')


def ensure_bashrc_source():
    target_user = os.environ.get('SUDO_USER') or os.environ.get('USER') or 'root'
    home = Path('/root') if target_user == 'root' else Path('/home') / target_user
    bashrc = home / '.bashrc'
    line = 'source /etc/profile.d/proxy.sh'
    try:
        content = bashrc.read_text(encoding='utf-8', errors='ignore') if bashrc.exists() else ''
        if line not in content:
            with open(bashrc, 'a', encoding='utf-8') as f:
                if content and not content.endswith('\n'):
                    f.write('\n')
                f.write(line + '\n')
        P.ok(f'已确保 {bashrc} 会加载 proxy 命令。')
    except Exception as e:
        P.warn(f'写入 {bashrc} 失败: {e}')


def first_sync_sources(script_dir: Path):
    source_file = script_dir / 'proxy_sources.txt'
    if not source_file.exists():
        P.warn(f'未找到默认来源文件: {source_file}')
        return
    P.info(f'首次同步来源文件: {source_file}')
    p = run([str(PROXY_BIN), 'add', str(source_file)], check=False, capture=True)
    if p.stdout:
        print(p.stdout.rstrip())
    if p.returncode == 0:
        P.ok('默认来源已同步。')
    else:
        P.warn('默认来源同步未成功,你稍后可以手动执行: proxy add')


def write_template_sources(script_dir: Path):
    tpl = script_dir / 'proxy_sources.txt'
    if tpl.exists():
        return
    tpl.write_text(textwrap.dedent('''
        # 每行一个来源;支持以下任一形式:
        # 1) 订阅链接
        # https://example.com/sub?token=xxxx
        #
        # 2) 单条节点 URI
        # vless://uuid@example.com:443?encryption=none&security=tls&type=ws&host=example.com&path=%2Fws#tokyo
        #
        # 3) 本地 txt 文件路径(文件里可放多条 vless/vmess/ss/trojan...)
        # /root/my_nodes.txt
    ''').lstrip(), encoding='utf-8')
    P.ok(f'已生成模板来源文件: {tpl}')


def main():
    ensure_root()
    parser = argparse.ArgumentParser(description='安装一体化 proxy 命令')
    parser.parse_args()
    script_dir = Path(__file__).resolve().parent

    P.title('proxy 一键安装')

    P.next('安装依赖并准备进度显示')
    apt_install_packages()

    P.next('清理旧安装并恢复到初始状态')
    cleanup_old_install()

    P.next('检查并安装 mihomo')
    install_mihomo_if_needed()

    P.next('写入 mihomo 配置')
    write_config(script_dir)

    P.next('安装 proxy 主命令')
    write_proxy_cli()

    P.next('安装 shell 命令入口')
    write_profile_script()

    P.next('安装 systemd 服务或 direct 模式')
    write_service_if_needed()

    P.next('准备来源文件并首次同步')
    write_template_sources(script_dir)
    first_sync_sources(script_dir)

    P.next('设置登录自动加载')
    ensure_bashrc_source()

    P.title('安装完成')
    print(textwrap.dedent(f'''
        已完成安装。现在请执行:
          source /etc/profile.d/proxy.sh

        常用命令:
          proxy on
          proxy off
          proxy add
          proxy list
          proxy select
          proxy check
          proxy fastest
          proxy remove fail
          proxy remove 3
          proxy status
          proxy help

        默认来源文件:
          {script_dir / 'proxy_sources.txt'}
    ''').strip())


if __name__ == '__main__':
    main()

编辑页面
分享到:

上一篇
力扣3546 等和矩阵分割 题解
下一篇
欢迎使用 AstroPaper