Harness 框架

Harness 是 EICPS 的系统集成与执行引擎,负责将脑-脊-体三层架构从”设计图纸”变成”活着的系统”:将 VLA、PINN/HNN 控制器、Φ 接口、EvidencePack 验证器组装为统一运行时,实时监控系统健康状态,驱动双流 V 模型的 Sim2Real 自动化验证循环。

Harness 是 EICPS 的”神经系统”——它感知时间,管理能量,抵御风险,保证系统的每一次心跳都是规律的。

Harness 框架:运行时集成、看门狗矩阵与安全反射弧

Harness 调度器

Harness 维护一个统一事件总线(Unified Event Bus),协调左侧两个子系统的输入——VLA 大脑(~2Hz,接口 A / JSON)和 PINN/HNN 控制器(~1kHz,接口 B / Protobuf)——并向右侧看门狗矩阵持续推送系统指标。

每个控制周期(1ms)的主调度流程:

① ZOH 插值           Brain 指令 (~2Hz) → 脊髓 (~1kHz),ZOH 零阶保持填充空档
② 降采样聚合          脊髓状态 (~1kHz) → Brain 摘要,按事件触发上报
③ 时间戳对齐          滑动窗口缓冲,确保 Brain 与 Spine 时钟一致
④ 看门狗检查          推送指标 → WatchdogMatrix.check_all()
⑤ Φ 投影(CBF-QP)   AI 意图 → 安全可行控制量 u_safe
⑥ Bumpless 混合      若刚发生 Jump,S 型平滑过渡

检测到 verdict=REJECTED 时立即路由到安全反射弧,跳过正常 Flow。


看门狗矩阵

Harness 运行一组层次化的看门狗(Watchdog),按触发优先级排列:

优先级看门狗触发条件响应动作
P0硬件安全狗电机过温、关节超限立即断电,进入 FAILSAFE
P1幻觉检测狗C<0.2\mathcal{C} < 0.2H(π)<0.1H(\pi) < 0.1切断 AI 控制,纯 CBF 守护
P2犹豫检测狗Htopo>0.85H_{topo} > 0.85 持续 3s注入对称破缺噪声,强制选择
P3Zeno 防护狗100ms 内发生 > 10 次 Jump正则化,强制最小驻留时间
P4电量管理狗电池 < 20%降速模式,触发返航
P5延迟超时狗接口 A 超时 > 5s脊髓自主模式(断开大脑)

P0/P1 触发时直接激活安全反射弧;P2–P5 触发时系统进入 DEGRADED 状态,降级运行而非停机。


安全反射弧

当 P0/P1 看门狗触发时,Harness 跳过 VLA 大脑,直接激活安全反射弧:

危险事件<1ms反射弧激活纯 CBF 守护安全停止\text{危险事件} \xrightarrow{< 1\text{ms}} \text{反射弧激活} \xrightarrow{\text{纯 CBF 守护}} \text{安全停止}

反射弧不依赖 AI 推理,只使用已验证的保守控制律(CBF-QP 零输入守护),保证在最坏情况下系统仍能安全停止。反射弧激活后,所有控制量、传感器轨迹和触发原因自动写入 EvidencePack 留痕,供事后审计。


EvidencePack 留痕

安全反射弧触发后,Harness 向 EvidencePack 异步写入三类记录:STL 验证记录(约束鲁棒度序列 ρ(t)\rho(t))、传感器轨迹摘要(编码器、IMU、力觉的关键时间窗口)和安全事件日志(触发看门狗的具体指标值与时间戳)。正常任务完成后同样生成 EvidencePack,作为安全认证的可追溯证据。

EvidencePack 协议完整规范


平滑切换(Bumpless Transfer)

当 Jump 触发控制器参数切换时,若直接替换控制律,电机输出会发生阶跃冲击,损坏硬件。Harness 执行 S 型平滑切换,在旧控制律与新控制律之间平滑过渡:

uswitch(t)=σ ⁣(ttjumpτ)unew+(1σ ⁣(ttjumpτ))uoldu_{switch}(t) = \sigma\!\left(\frac{t - t_{jump}}{\tau}\right) \cdot u_{new} + \left(1 - \sigma\!\left(\frac{t - t_{jump}}{\tau}\right)\right) \cdot u_{old}

σ\sigma 是 sigmoid 函数,τ\tau 是过渡时间常数(通常 50–200ms)。过渡期间电机输出单调递增,不产生冲击电流。


CI 管线:Sim2Sim / Sim2Real 自动化验证

Harness 驱动双流 V 模型的验证循环,实现无人值守的持续集成(CI)

Unity / Python 仿真模型变更

Harness CI Trigger

L4 物理 AI 工厂(重新生成数据 + 训练)

L5 Sim2Sim 验证(200 轮仿真,成功率 ≥ 95%)
       ↓  若未通过 → 回溯到 L3
L6 Sim2Real 检验(d_GH < ε)
       ↓  若未通过 → 回溯到 L4
部署就绪(Deploy Ready)

每次 L5 或 L6 验证失败,Harness 自动回溯到对应阶段,无需人工干预。


算法实现

import time
import threading
import numpy as np
from dataclasses import dataclass, field
from enum import Enum, auto
from collections import deque
from typing import Callable

# ── 系统健康状态 ──────────────────────────────────────────────

class SystemHealth(Enum):
    NOMINAL   = auto()    # 正常运行
    DEGRADED  = auto()    # 性能降级(轻微异常)
    CRITICAL  = auto()    # 关键告警(准备切换)
    FAILSAFE  = auto()    # 安全反射弧激活

@dataclass
class HealthReport:
    timestamp:   float
    status:      SystemHealth
    watchdog_id: str
    value:       float
    threshold:   float
    action:      str

# ── 看门狗矩阵 ────────────────────────────────────────────────

@dataclass
class WatchdogConfig:
    name:      str
    priority:  int
    check:     Callable       # (metrics) → (triggered, value)
    threshold: float
    action:    str            # 触发时执行的动作标识

class WatchdogMatrix:
    """层次化看门狗矩阵"""

    def __init__(self):
        self.watchdogs: list[WatchdogConfig] = []
        self._setup_default_watchdogs()

    def _setup_default_watchdogs(self) -> None:
        self.watchdogs = [
            WatchdogConfig(
                name="hardware_safety", priority=0,
                check=lambda m: (m.get("motor_temp", 0) > 80,
                                  m.get("motor_temp", 0)),
                threshold=80.0, action="EMERGENCY_STOP"
            ),
            WatchdogConfig(
                name="hallucination", priority=1,
                check=lambda m: (m.get("C_score", 1) < 0.2 and
                                  m.get("H_policy", 1) < 0.1,
                                  m.get("C_score", 1)),
                threshold=0.2, action="OVERRIDE_AI"
            ),
            WatchdogConfig(
                name="hesitation", priority=2,
                check=lambda m: (m.get("H_topo", 0) > 0.85,
                                  m.get("H_topo", 0)),
                threshold=0.85, action="SYMMETRY_BREAK"
            ),
            WatchdogConfig(
                name="battery_low", priority=4,
                check=lambda m: (m.get("battery", 100) < 20,
                                  m.get("battery", 100)),
                threshold=20.0, action="RETURN_TO_BASE"
            ),
            WatchdogConfig(
                name="interface_timeout", priority=5,
                check=lambda m: (m.get("brain_latency", 0) > 5.0,
                                  m.get("brain_latency", 0)),
                threshold=5.0, action="SPINE_AUTONOMOUS"
            ),
        ]

    def check_all(self, metrics: dict) -> list[HealthReport]:
        """按优先级检查所有看门狗,返回触发的告警"""
        alerts = []
        for wd in sorted(self.watchdogs, key=lambda w: w.priority):
            triggered, value = wd.check(metrics)
            if triggered:
                alerts.append(HealthReport(
                    timestamp=time.time(),
                    status=SystemHealth.CRITICAL if wd.priority <= 1
                           else SystemHealth.DEGRADED,
                    watchdog_id=wd.name,
                    value=float(value),
                    threshold=wd.threshold,
                    action=wd.action,
                ))
        return alerts


# ── 平滑切换(Bumpless Transfer)────────────────────────────

class BumplessTransfer:
    """控制律切换时的 S 型平滑过渡"""

    def __init__(self, tau: float = 0.1):
        self.tau = tau            # 过渡时间常数(秒)
        self._t_jump: float | None = None
        self._u_old: np.ndarray | None = None

    def trigger(self, t_now: float, u_old: np.ndarray) -> None:
        """触发平滑切换"""
        self._t_jump = t_now
        self._u_old  = u_old.copy()

    def blend(self, t_now: float, u_new: np.ndarray) -> np.ndarray:
        """S 型混合:α × u_new + (1-α) × u_old"""
        if self._t_jump is None or self._u_old is None:
            return u_new
        dt = t_now - self._t_jump
        alpha = 1.0 / (1.0 + np.exp(-10 * (dt / self.tau - 0.5)))
        if dt >= self.tau * 2:
            self._t_jump = None  # 切换完成,清除缓存
        return alpha * u_new + (1.0 - alpha) * self._u_old


# ── Harness 主调度器 ──────────────────────────────────────────

class EICPSHarness:
    """
    EICPS Harness:系统集成与执行引擎
    协调 VLA → Φ → PINN/HNN → Body 的完整数据流
    """

    def __init__(self, interface_a, interface_b, phi_interface,
                 spine_controller, evidence_validator):
        self.iface_a     = interface_a
        self.iface_b     = interface_b
        self.phi         = phi_interface
        self.spine       = spine_controller
        self.validator   = evidence_validator

        self.watchdog    = WatchdogMatrix()
        self.bumpless    = BumplessTransfer(tau=0.1)
        self.mode        = "NOMINAL"
        self._jump_times = deque(maxlen=20)
        self._metrics: dict = {}

    def spine_cycle(self, t: float, obs) -> np.ndarray:
        """主循环:1kHz 脊髓级别调度,返回最终控制输出 u_safe"""
        brain_cmd = self.iface_a.get_current_command()
        self._update_metrics(t, obs, brain_cmd)

        alerts = self.watchdog.check_all(self._metrics)
        if alerts:
            return self._handle_alerts(t, alerts, obs)

        if brain_cmd is not None:
            u_ai = self.spine.predict_next(obs.q, brain_cmd.waypoint)
        else:
            u_ai = np.zeros(self.spine.m)

        A_cbf, b_cbf = self._build_cbf_constraints(obs)
        u_safe = self.phi.project(u_ai, A_cbf, b_cbf)
        return self.bumpless.blend(t, u_safe)

    def _update_metrics(self, t: float, obs, brain_cmd) -> None:
        self._metrics.update({
            "t":             t,
            "motor_temp":    getattr(obs, "motor_temp", 25.0),
            "battery":       getattr(obs, "battery", 100.0),
            "brain_latency": t - getattr(brain_cmd, "timestamp", t),
            "H_topo":        self._metrics.get("H_topo", 0.0),
            "C_score":       self._metrics.get("C_score", 1.0),
            "H_policy":      self._metrics.get("H_policy", 1.0),
        })

    def inject_spectral_metrics(self, H_topo: float,
                                 C_score: float,
                                 H_policy: float) -> None:
        """ESP 层注入谱诊断指标(由 spectral_health_check 调用)"""
        self._metrics.update({"H_topo": H_topo,
                               "C_score": C_score,
                               "H_policy": H_policy})

    def _handle_alerts(self, t: float,
                        alerts: list[HealthReport],
                        obs) -> np.ndarray:
        top = alerts[0]
        if top.action == "EMERGENCY_STOP":
            self.mode = "FAILSAFE"
            return np.zeros(len(obs.q))
        elif top.action == "OVERRIDE_AI":
            A_cbf, b_cbf = self._build_cbf_constraints(obs)
            return self.phi.project(np.zeros(len(obs.q)), A_cbf, b_cbf)
        elif top.action == "SYMMETRY_BREAK":
            return self._metrics.get("last_u", np.zeros(len(obs.q))) \
                   + np.random.randn(len(obs.q)) * 0.05
        else:
            return self._metrics.get("last_u", np.zeros(len(obs.q)))

    def _build_cbf_constraints(self, obs) -> tuple:
        n = len(obs.q)
        return np.eye(n), np.ones(n) * 0.5

与 AI Agent Harness 的关系

EICPS Harness 与当前 LLM Agent 领域的 Harness Engineering(LangChain、AutoGen、CrewAI 等框架的底层逻辑)共享同一个核心思路:编排层(Orchestration Layer)——解决”如何把多个 AI 组件粘合成一个可运行的系统”这个问题。两者都在处理多 Agent 协调调度、工具调用接口规范、异常时的降级路由,以及对 Agent 行为的监控与评估。

关键差异在约束等级:LLM Agent Harness 活在软件世界,延迟容忍度高,失败的代价是”回答错了”;EICPS Harness 活在物理世界,1ms 超时触发硬件停机,失败的代价是”机器人摔落高压线”。这使得 EICPS Harness 多出了三个 LLM Agent 框架基本不存在的机制:硬实时调度(jitter < 0.1ms)、分级看门狗矩阵(P0 直接断电)、以及形式化安全证据链(EvidencePack)。

两个领域的概念存在直接对应关系:

LLM Agent HarnessEICPS Harness
Agent Router / PlannerHarness 调度器 + ZOH
Guardrails / Safety Filter看门狗矩阵 P0–P5
Fallback / Error Handler安全反射弧
Trace / LoggingEvidencePack
Eval PipelineCI 管线 Sim2Sim / Sim2Real

可以这样理解:EICPS Harness 是 LLM Agent Harness 的具身化版本——把语言模型换成物理系统后,同样的编排问题变成了实时控制问题,同样的安全过滤变成了 CBF 硬约束,同样的日志记录变成了形式化可审计证据。两个领域正在从不同的起点向同一个方向收敛。


延伸阅读