Harness 框架
Harness 是 EICPS 的系统集成与执行引擎,负责将脑-脊-体三层架构从”设计图纸”变成”活着的系统”:将 VLA、PINN/HNN 控制器、Φ 接口、EvidencePack 验证器组装为统一运行时,实时监控系统健康状态,驱动双流 V 模型的 Sim2Real 自动化验证循环。
Harness 是 EICPS 的”神经系统”——它感知时间,管理能量,抵御风险,保证系统的每一次心跳都是规律的。
Harness 调度器
Harness 维护一个统一事件总线(Unified Event Bus),协调左侧两个子系统的输入——VLA 大脑(~2Hz,接口 A / JSON)和 PINN/HNN 控制器(~1kHz,接口 B / Protobuf)——并向右侧看门狗矩阵持续推送系统指标。
每个控制周期(1ms)的主调度流程:
① ZOH 插值 Brain 指令 (~2Hz) → 脊髓 (~1kHz),ZOH 零阶保持填充空档
② 降采样聚合 脊髓状态 (~1kHz) → Brain 摘要,按事件触发上报
③ 时间戳对齐 滑动窗口缓冲,确保 Brain 与 Spine 时钟一致
④ 看门狗检查 推送指标 → WatchdogMatrix.check_all()
⑤ Φ 投影(CBF-QP) AI 意图 → 安全可行控制量 u_safe
⑥ Bumpless 混合 若刚发生 Jump,S 型平滑过渡
检测到 verdict=REJECTED 时立即路由到安全反射弧,跳过正常 Flow。
看门狗矩阵
Harness 运行一组层次化的看门狗(Watchdog),按触发优先级排列:
| 优先级 | 看门狗 | 触发条件 | 响应动作 |
|---|---|---|---|
| P0 | 硬件安全狗 | 电机过温、关节超限 | 立即断电,进入 FAILSAFE |
| P1 | 幻觉检测狗 | 且 | 切断 AI 控制,纯 CBF 守护 |
| P2 | 犹豫检测狗 | 持续 3s | 注入对称破缺噪声,强制选择 |
| P3 | Zeno 防护狗 | 100ms 内发生 > 10 次 Jump | 正则化,强制最小驻留时间 |
| P4 | 电量管理狗 | 电池 < 20% | 降速模式,触发返航 |
| P5 | 延迟超时狗 | 接口 A 超时 > 5s | 脊髓自主模式(断开大脑) |
P0/P1 触发时直接激活安全反射弧;P2–P5 触发时系统进入 DEGRADED 状态,降级运行而非停机。
安全反射弧
当 P0/P1 看门狗触发时,Harness 跳过 VLA 大脑,直接激活安全反射弧:
反射弧不依赖 AI 推理,只使用已验证的保守控制律(CBF-QP 零输入守护),保证在最坏情况下系统仍能安全停止。反射弧激活后,所有控制量、传感器轨迹和触发原因自动写入 EvidencePack 留痕,供事后审计。
EvidencePack 留痕
安全反射弧触发后,Harness 向 EvidencePack 异步写入三类记录:STL 验证记录(约束鲁棒度序列 )、传感器轨迹摘要(编码器、IMU、力觉的关键时间窗口)和安全事件日志(触发看门狗的具体指标值与时间戳)。正常任务完成后同样生成 EvidencePack,作为安全认证的可追溯证据。
平滑切换(Bumpless Transfer)
当 Jump 触发控制器参数切换时,若直接替换控制律,电机输出会发生阶跃冲击,损坏硬件。Harness 执行 S 型平滑切换,在旧控制律与新控制律之间平滑过渡:
是 sigmoid 函数, 是过渡时间常数(通常 50–200ms)。过渡期间电机输出单调递增,不产生冲击电流。
CI 管线:Sim2Sim / Sim2Real 自动化验证
Harness 驱动双流 V 模型的验证循环,实现无人值守的持续集成(CI):
Unity / Python 仿真模型变更
↓
Harness CI Trigger
↓
L4 物理 AI 工厂(重新生成数据 + 训练)
↓
L5 Sim2Sim 验证(200 轮仿真,成功率 ≥ 95%)
↓ 若未通过 → 回溯到 L3
L6 Sim2Real 检验(d_GH < ε)
↓ 若未通过 → 回溯到 L4
部署就绪(Deploy Ready)
每次 L5 或 L6 验证失败,Harness 自动回溯到对应阶段,无需人工干预。
算法实现
import time
import threading
import numpy as np
from dataclasses import dataclass, field
from enum import Enum, auto
from collections import deque
from typing import Callable
# ── 系统健康状态 ──────────────────────────────────────────────
class SystemHealth(Enum):
NOMINAL = auto() # 正常运行
DEGRADED = auto() # 性能降级(轻微异常)
CRITICAL = auto() # 关键告警(准备切换)
FAILSAFE = auto() # 安全反射弧激活
@dataclass
class HealthReport:
timestamp: float
status: SystemHealth
watchdog_id: str
value: float
threshold: float
action: str
# ── 看门狗矩阵 ────────────────────────────────────────────────
@dataclass
class WatchdogConfig:
name: str
priority: int
check: Callable # (metrics) → (triggered, value)
threshold: float
action: str # 触发时执行的动作标识
class WatchdogMatrix:
"""层次化看门狗矩阵"""
def __init__(self):
self.watchdogs: list[WatchdogConfig] = []
self._setup_default_watchdogs()
def _setup_default_watchdogs(self) -> None:
self.watchdogs = [
WatchdogConfig(
name="hardware_safety", priority=0,
check=lambda m: (m.get("motor_temp", 0) > 80,
m.get("motor_temp", 0)),
threshold=80.0, action="EMERGENCY_STOP"
),
WatchdogConfig(
name="hallucination", priority=1,
check=lambda m: (m.get("C_score", 1) < 0.2 and
m.get("H_policy", 1) < 0.1,
m.get("C_score", 1)),
threshold=0.2, action="OVERRIDE_AI"
),
WatchdogConfig(
name="hesitation", priority=2,
check=lambda m: (m.get("H_topo", 0) > 0.85,
m.get("H_topo", 0)),
threshold=0.85, action="SYMMETRY_BREAK"
),
WatchdogConfig(
name="battery_low", priority=4,
check=lambda m: (m.get("battery", 100) < 20,
m.get("battery", 100)),
threshold=20.0, action="RETURN_TO_BASE"
),
WatchdogConfig(
name="interface_timeout", priority=5,
check=lambda m: (m.get("brain_latency", 0) > 5.0,
m.get("brain_latency", 0)),
threshold=5.0, action="SPINE_AUTONOMOUS"
),
]
def check_all(self, metrics: dict) -> list[HealthReport]:
"""按优先级检查所有看门狗,返回触发的告警"""
alerts = []
for wd in sorted(self.watchdogs, key=lambda w: w.priority):
triggered, value = wd.check(metrics)
if triggered:
alerts.append(HealthReport(
timestamp=time.time(),
status=SystemHealth.CRITICAL if wd.priority <= 1
else SystemHealth.DEGRADED,
watchdog_id=wd.name,
value=float(value),
threshold=wd.threshold,
action=wd.action,
))
return alerts
# ── 平滑切换(Bumpless Transfer)────────────────────────────
class BumplessTransfer:
"""控制律切换时的 S 型平滑过渡"""
def __init__(self, tau: float = 0.1):
self.tau = tau # 过渡时间常数(秒)
self._t_jump: float | None = None
self._u_old: np.ndarray | None = None
def trigger(self, t_now: float, u_old: np.ndarray) -> None:
"""触发平滑切换"""
self._t_jump = t_now
self._u_old = u_old.copy()
def blend(self, t_now: float, u_new: np.ndarray) -> np.ndarray:
"""S 型混合:α × u_new + (1-α) × u_old"""
if self._t_jump is None or self._u_old is None:
return u_new
dt = t_now - self._t_jump
alpha = 1.0 / (1.0 + np.exp(-10 * (dt / self.tau - 0.5)))
if dt >= self.tau * 2:
self._t_jump = None # 切换完成,清除缓存
return alpha * u_new + (1.0 - alpha) * self._u_old
# ── Harness 主调度器 ──────────────────────────────────────────
class EICPSHarness:
"""
EICPS Harness:系统集成与执行引擎
协调 VLA → Φ → PINN/HNN → Body 的完整数据流
"""
def __init__(self, interface_a, interface_b, phi_interface,
spine_controller, evidence_validator):
self.iface_a = interface_a
self.iface_b = interface_b
self.phi = phi_interface
self.spine = spine_controller
self.validator = evidence_validator
self.watchdog = WatchdogMatrix()
self.bumpless = BumplessTransfer(tau=0.1)
self.mode = "NOMINAL"
self._jump_times = deque(maxlen=20)
self._metrics: dict = {}
def spine_cycle(self, t: float, obs) -> np.ndarray:
"""主循环:1kHz 脊髓级别调度,返回最终控制输出 u_safe"""
brain_cmd = self.iface_a.get_current_command()
self._update_metrics(t, obs, brain_cmd)
alerts = self.watchdog.check_all(self._metrics)
if alerts:
return self._handle_alerts(t, alerts, obs)
if brain_cmd is not None:
u_ai = self.spine.predict_next(obs.q, brain_cmd.waypoint)
else:
u_ai = np.zeros(self.spine.m)
A_cbf, b_cbf = self._build_cbf_constraints(obs)
u_safe = self.phi.project(u_ai, A_cbf, b_cbf)
return self.bumpless.blend(t, u_safe)
def _update_metrics(self, t: float, obs, brain_cmd) -> None:
self._metrics.update({
"t": t,
"motor_temp": getattr(obs, "motor_temp", 25.0),
"battery": getattr(obs, "battery", 100.0),
"brain_latency": t - getattr(brain_cmd, "timestamp", t),
"H_topo": self._metrics.get("H_topo", 0.0),
"C_score": self._metrics.get("C_score", 1.0),
"H_policy": self._metrics.get("H_policy", 1.0),
})
def inject_spectral_metrics(self, H_topo: float,
C_score: float,
H_policy: float) -> None:
"""ESP 层注入谱诊断指标(由 spectral_health_check 调用)"""
self._metrics.update({"H_topo": H_topo,
"C_score": C_score,
"H_policy": H_policy})
def _handle_alerts(self, t: float,
alerts: list[HealthReport],
obs) -> np.ndarray:
top = alerts[0]
if top.action == "EMERGENCY_STOP":
self.mode = "FAILSAFE"
return np.zeros(len(obs.q))
elif top.action == "OVERRIDE_AI":
A_cbf, b_cbf = self._build_cbf_constraints(obs)
return self.phi.project(np.zeros(len(obs.q)), A_cbf, b_cbf)
elif top.action == "SYMMETRY_BREAK":
return self._metrics.get("last_u", np.zeros(len(obs.q))) \
+ np.random.randn(len(obs.q)) * 0.05
else:
return self._metrics.get("last_u", np.zeros(len(obs.q)))
def _build_cbf_constraints(self, obs) -> tuple:
n = len(obs.q)
return np.eye(n), np.ones(n) * 0.5
与 AI Agent Harness 的关系
EICPS Harness 与当前 LLM Agent 领域的 Harness Engineering(LangChain、AutoGen、CrewAI 等框架的底层逻辑)共享同一个核心思路:编排层(Orchestration Layer)——解决”如何把多个 AI 组件粘合成一个可运行的系统”这个问题。两者都在处理多 Agent 协调调度、工具调用接口规范、异常时的降级路由,以及对 Agent 行为的监控与评估。
关键差异在约束等级:LLM Agent Harness 活在软件世界,延迟容忍度高,失败的代价是”回答错了”;EICPS Harness 活在物理世界,1ms 超时触发硬件停机,失败的代价是”机器人摔落高压线”。这使得 EICPS Harness 多出了三个 LLM Agent 框架基本不存在的机制:硬实时调度(jitter < 0.1ms)、分级看门狗矩阵(P0 直接断电)、以及形式化安全证据链(EvidencePack)。
两个领域的概念存在直接对应关系:
| LLM Agent Harness | EICPS Harness |
|---|---|
| Agent Router / Planner | Harness 调度器 + ZOH |
| Guardrails / Safety Filter | 看门狗矩阵 P0–P5 |
| Fallback / Error Handler | 安全反射弧 |
| Trace / Logging | EvidencePack |
| Eval Pipeline | CI 管线 Sim2Sim / Sim2Real |
可以这样理解:EICPS Harness 是 LLM Agent Harness 的具身化版本——把语言模型换成物理系统后,同样的编排问题变成了实时控制问题,同样的安全过滤变成了 CBF 硬约束,同样的日志记录变成了形式化可审计证据。两个领域正在从不同的起点向同一个方向收敛。
延伸阅读
- 系统部署架构 — Harness 所协调的三层硬件架构
- 接口协议 A/B — Harness 管理的通信接口规范
- 实时安全监控(CBF) — 看门狗矩阵的幻觉/犹豫诊断指标来源
- 双流 V 模型 — Harness CI 管线所驱动的验证循环
- EvidencePack 协议 — 安全反射弧触发后的证据留痕规范