接口协议 A/B
三层架构的解耦靠的不是约定,而是严格标准化的接口协议。接口 A 和接口 B 划定了三个计算域的边界,定义了什么信息能跨层流动、以什么格式、多高频率。
接口 A 是语义与物理之间的翻译层;接口 B 是数字控制与物理执行之间的同步通道。
接口 A:语义-动力学界面(Brain ↔ Spine)
连接对象:VLA/LLM Brain 层 ↔ ESP Spine 层
| 属性 | 规范 |
|---|---|
| 频率 | ~1 Hz,异步 |
| 通信方式 | 事件驱动推送(Brain → Spine)+ 事件驱动上报(Spine → Brain) |
| 数据格式 | JSON / Protocol Buffers |
| 延迟容忍 | 500ms–2s(大模型推理延迟) |
| 失联处理 | ZOH:Spine 保持最后一次指令继续执行,不等待 |
核心约束:Brain 层只输出”去哪里”和”满足什么约束”,不接触电机。VLA 严禁直接输出关节力矩或速度指令——它只定义目标和规约,Spine 计算怎么执行。
下行(Brain → Spine):STL 规约包 + ETL 任务图
每次下行载荷包含两部分,Spine 收到后立即生效,旧载荷覆盖:
{
"seq_id": 42,
"timestamp": "2025-06-01T09:00:00.500Z",
"stl_rules": [
{
"id": "STL-001",
"spec": "G[0,T](d_wire >= 5.0)",
"priority": "CRITICAL",
"comment": "机身与带电导线距离始终 ≥ 5.0m(500kV)"
},
{
"id": "STL-002",
"spec": "G[0,T](|tau_joint| <= tau_max)",
"priority": "HIGH",
"comment": "关节力矩不超过电机峰值扭矩"
}
],
"etl_task": {
"task_id": "REPLACE_DAMPER_01",
"mode": "GRASP",
"operators": ["approach_attachment_point", "grip_damper", "remove_old_damper"],
"params": {
"target_pose": [1.2, 0.0, 8.5, 0.0, 0.0, 0.0, 1.0],
"grip_force_N": 150,
"approach_speed": "SLOW"
}
}
}
stl_rules 随任务段动态变化——不同作业阶段(接近、抓取、拆卸、安装)的约束集合不同,Brain 在每次模态切换时重新下发。
上行(Spine → Brain):状态摘要,事件驱动
Spine 不推送原始 1kHz 传感器流,仅在以下条件之一满足时上报:STL 鲁棒度 跌破预警阈值、发生模态切换、或距上次上报超过 500ms(心跳):
{
"seq_id": 42,
"timestamp": "2025-06-01T09:00:00.502Z",
"trigger": "rho_warn",
"mode": "GRASP",
"rho": {
"STL-001": 0.43,
"STL-002": 1.87
},
"pose_summary": {
"end_effector": [1.18, 0.02, 8.48],
"orientation": [0.99, 0.01, 0.02, 0.00]
},
"safety_margin_m": 0.43
}
trigger: "rho_warn" 表示 STL-001 的鲁棒度已跌至 0.43m(接近 m),Brain 据此决策是否调整规划。Brain 无响应时,Spine 依靠 ZOH 零阶保持维持最后一次指令继续运行,不停机、不等待。
Φ 投影:AI 意图 → 物理安全指令
接口 A 下行的语义指令(目标位姿、作业模式)需要在 Spine 层转换为接口 B 下行的参考力矩 。这一转换由 Φ 投影完成:在保持 AI 意图的前提下,以最小修正量使控制输出满足物理约束和安全约束。
工程上实现为 CBF-QP(控制屏障函数二次规划),在每个 1ms 控制周期内求解,保证安全集不变性 。safety_flag = 0xFF 时,电机驱动单元立即切断力矩输出,不等待上层确认——这是物理层的最后防线。
Bumpless Transfer:平滑模态切换
当 Brain 下发新指令触发模态切换(Jump)时,若直接替换控制律,电机输出会发生阶跃冲击。Spine 执行 Bumpless Transfer 避免控制量突变:
通常取 3–5ms(3–5 个控制周期),足以消除电机电流冲击,对任务执行几乎无感。
接口 B:数字-物理界面(Spine ↔ Body)
连接对象:ESP Spine 层 ↔ 电机驱动单元 + 传感器
| 属性 | 规范 |
|---|---|
| 频率 | ~1 kHz,同步 |
| 总线协议 | EtherCAT(工业以太网实时总线) |
| 时延要求 | ≤1ms(硬实时,不可违反) |
| 数据格式 | 二进制定长帧 |
核心约束:迟到的正确答案等于错误答案。接口 B 的时延抖动(jitter)须小于 0.1ms;任何软件层超时直接触发硬件安全停机。
下行(Spine → Body):参考力矩帧
ControlFrame {
timestamp_ns: uint64 // 硬件时间戳,纳秒精度
joint_index: uint8[n] // 关节索引
tau_ref: float32[n]// 参考力矩(Nm)
safety_flag: uint8 // 0x00=正常 / 0xFF=紧急停机
}
上行(Body → Spine):传感器观测帧
ObservationFrame {
timestamp_ns: uint64
q: float32[n] // 关节角度(rad)
dq: float32[n] // 关节角速度(rad/s)
tau_actual: float32[n] // 实际力矩(Nm)
imu_accel: float32[3] // 线加速度(m/s²)
imu_gyro: float32[3] // 角速度(rad/s)
force_torque: float32[6] // 末端六轴力/力矩
}
观测帧不经任何软件过滤直接回流 Spine,确保 EKF 状态估计和 CBF 安全监控看到的是真实物理状态。
三条核心原则
Brain 不碰电机:VLA 只定义目标和规约,Spine 计算执行。AI 幻觉无法越过接口 A 直接传播到物理层——Brain 发出的任何语义指令,必须先经过 Spine 的 Φ 投影过滤,再经接口 B 才能到达电机。
Spine 是唯一控制出口:所有物理指令必须经 Φ 投影(CBF-QP)过滤。无论 Brain 发出的语义指令在语义上多么合理,Spine 都以安全约束二次修正,保证 在每个控制周期内成立。
Body 观测直通无过滤:ObservationFrame 不经任何软件处理直接回流 Spine,EKF 和 CBF 看到的是真实物理状态。软件过滤会引入延迟和失真,在高速接触动力学场景下直接导致控制发散。
算法实现
import json, threading, numpy as np, cvxpy as cp
from dataclasses import dataclass
from collections import deque
# ── 接口 A 报文结构 ───────────────────────────────────────────
@dataclass
class STLRule:
id: str
spec: str
priority: str # "CRITICAL" | "HIGH" | "MEDIUM"
@dataclass
class BrainCommand:
"""Brain → Spine 下行载荷(接口 A)"""
seq_id: int
timestamp: float
stl_rules: list[STLRule]
mode: str
target: np.ndarray # 目标位姿 [x, y, z, qw, qx, qy, qz]
@classmethod
def from_json(cls, raw: str) -> "BrainCommand":
d = json.loads(raw)
rules = [STLRule(**r) for r in d.get("stl_rules", [])]
target = np.array(d["etl_task"]["params"]["target_pose"])
return cls(seq_id=d["seq_id"], timestamp=d["timestamp"],
stl_rules=rules, mode=d["etl_task"]["mode"],
target=target)
@dataclass
class SpineReport:
"""Spine → Brain 上行状态摘要(接口 A)"""
seq_id: int
timestamp: float
trigger: str # "heartbeat" | "rho_warn" | "mode_change"
mode: str
rho: dict # {rule_id: rho_value}
safety_margin: float
def to_json(self) -> str:
return json.dumps({
"seq_id": self.seq_id, "timestamp": self.timestamp,
"trigger": self.trigger, "mode": self.mode,
"rho": self.rho, "safety_margin_m": self.safety_margin,
})
# ── 接口 A 通信管理器(ZOH + 事件驱动上报)────────────────────
class InterfaceA:
def __init__(self, rho_warn: float = 0.5, heartbeat_s: float = 0.5):
self._lock = threading.Lock()
self._last_cmd: BrainCommand | None = None
self.rho_warn = rho_warn
self.heartbeat = heartbeat_s
self._last_report_t = -float("inf")
def receive(self, raw_json: str) -> None:
"""Brain 下发新指令(异步调用)"""
with self._lock:
self._last_cmd = BrainCommand.from_json(raw_json)
def get_command(self) -> BrainCommand | None:
"""Spine 每毫秒读取(ZOH:返回最近一次指令)"""
with self._lock:
return self._last_cmd
def should_report(self, t: float, rho_min: float) -> bool:
if rho_min <= self.rho_warn:
return True
if t - self._last_report_t >= self.heartbeat:
self._last_report_t = t
return True
return False
# ── 接口 B 帧结构 ─────────────────────────────────────────────
@dataclass
class ControlFrame:
"""Spine → Body,1kHz"""
timestamp_ns: int
tau_ref: np.ndarray # 关节参考力矩
safety_flag: int = 0x00 # 0xFF = 紧急停机
@dataclass
class ObservationFrame:
"""Body → Spine,1kHz"""
timestamp_ns: int
q: np.ndarray # 关节角度
dq: np.ndarray # 关节角速度
tau_actual: np.ndarray # 实际力矩
imu_accel: np.ndarray # 线加速度
imu_gyro: np.ndarray # 角速度
force_torque: np.ndarray # 末端六轴力/力矩
# ── Φ 投影:CBF-QP ────────────────────────────────────────────
class PhiProjection:
"""将 AI 意图投影到物理安全可行集(CBF 二次规划)"""
def __init__(self, n_joints: int):
self.n = n_joints
def project(self,
u_ref: np.ndarray, # Brain 参考指令
A_cbf: np.ndarray, # CBF 约束矩阵
b_cbf: np.ndarray, # CBF 约束向量
tau_max: float = 50.0) -> np.ndarray:
"""
min ‖u - u_ref‖²
s.t. A_cbf @ u + b_cbf ≥ 0 (安全集不变性)
‖u‖_∞ ≤ tau_max (关节力矩上限)
"""
u = cp.Variable(self.n)
prob = cp.Problem(
cp.Minimize(cp.sum_squares(u - u_ref)),
[A_cbf @ u + b_cbf >= 0, cp.norm_inf(u) <= tau_max]
)
prob.solve(solver=cp.OSQP, warm_start=True, verbose=False)
return u.value if prob.status in ("optimal", "optimal_inaccurate") \
else np.zeros(self.n)
# ── Bumpless Transfer ─────────────────────────────────────────
class BumplessTransfer:
"""模态切换时的平滑过渡(消除控制量突变)"""
def __init__(self, T_trans_ms: float = 5.0, dt_ms: float = 1.0):
self.steps = int(T_trans_ms / dt_ms)
self._step = 0
self._u_old = None
self._u_new = None
def trigger(self, u_old: np.ndarray, u_new: np.ndarray) -> None:
self._u_old, self._u_new, self._step = u_old, u_new, 0
def get(self, u_nominal: np.ndarray) -> np.ndarray:
if self._u_old is None or self._step >= self.steps:
return u_nominal
alpha = self._step / self.steps
self._step += 1
return (1 - alpha) * self._u_old + alpha * self._u_new
→ 实时安全监控(CBF)( 投影中 CBF-QP 的安全约束构造)
→ 安全约束验证(STL)(stl_rules 字段的语义与鲁棒度计算)
→ 跨频调度(ZOH 零阶保持与 Bumpless Transfer 的调度逻辑)
→ EvidencePack 协议(接口 A 下行流的形式化验证与证据留存)