接口协议 A/B

三层架构的解耦靠的不是约定,而是严格标准化的接口协议。接口 A 和接口 B 划定了三个计算域的边界,定义了什么信息能跨层流动、以什么格式、多高频率。

接口 A 是语义与物理之间的翻译层;接口 B 是数字控制与物理执行之间的同步通道。

接口协议 A/B:语义-动力学界面与数字-物理界面

接口 A:语义-动力学界面(Brain ↔ Spine)

连接对象:VLA/LLM Brain 层 ↔ ESP Spine 层

属性规范
频率~1 Hz,异步
通信方式事件驱动推送(Brain → Spine)+ 事件驱动上报(Spine → Brain)
数据格式JSON / Protocol Buffers
延迟容忍500ms–2s(大模型推理延迟)
失联处理ZOH:Spine 保持最后一次指令继续执行,不等待

核心约束:Brain 层只输出”去哪里”和”满足什么约束”,不接触电机。VLA 严禁直接输出关节力矩或速度指令——它只定义目标和规约,Spine 计算怎么执行。

下行(Brain → Spine):STL 规约包 + ETL 任务图

每次下行载荷包含两部分,Spine 收到后立即生效,旧载荷覆盖:

{
  "seq_id":    42,
  "timestamp": "2025-06-01T09:00:00.500Z",
  "stl_rules": [
    {
      "id":       "STL-001",
      "spec":     "G[0,T](d_wire >= 5.0)",
      "priority": "CRITICAL",
      "comment":  "机身与带电导线距离始终 ≥ 5.0m(500kV)"
    },
    {
      "id":       "STL-002",
      "spec":     "G[0,T](|tau_joint| <= tau_max)",
      "priority": "HIGH",
      "comment":  "关节力矩不超过电机峰值扭矩"
    }
  ],
  "etl_task": {
    "task_id":   "REPLACE_DAMPER_01",
    "mode":      "GRASP",
    "operators": ["approach_attachment_point", "grip_damper", "remove_old_damper"],
    "params":    {
      "target_pose":  [1.2, 0.0, 8.5, 0.0, 0.0, 0.0, 1.0],
      "grip_force_N": 150,
      "approach_speed": "SLOW"
    }
  }
}

stl_rules 随任务段动态变化——不同作业阶段(接近、抓取、拆卸、安装)的约束集合不同,Brain 在每次模态切换时重新下发。

上行(Spine → Brain):状态摘要,事件驱动

Spine 不推送原始 1kHz 传感器流,仅在以下条件之一满足时上报:STL 鲁棒度 ρ\rho 跌破预警阈值、发生模态切换、或距上次上报超过 500ms(心跳):

{
  "seq_id":    42,
  "timestamp": "2025-06-01T09:00:00.502Z",
  "trigger":   "rho_warn",
  "mode":      "GRASP",
  "rho": {
    "STL-001": 0.43,
    "STL-002": 1.87
  },
  "pose_summary": {
    "end_effector": [1.18, 0.02, 8.48],
    "orientation":  [0.99, 0.01, 0.02, 0.00]
  },
  "safety_margin_m": 0.43
}

trigger: "rho_warn" 表示 STL-001 的鲁棒度已跌至 0.43m(接近 ρwarn=0.5\rho_{warn} = 0.5m),Brain 据此决策是否调整规划。Brain 无响应时,Spine 依靠 ZOH 零阶保持维持最后一次指令继续运行,不停机、不等待。


Φ 投影:AI 意图 → 物理安全指令

接口 A 下行的语义指令(目标位姿、作业模式)需要在 Spine 层转换为接口 B 下行的参考力矩 τref\tau_{ref}。这一转换由 Φ 投影完成:在保持 AI 意图的前提下,以最小修正量使控制输出满足物理约束和安全约束。

usafe=argminuuuref2s.t.h˙(x,u)α(h(x))u_{safe} = \arg\min_{u} \|u - u_{ref}\|^2 \quad \text{s.t.} \quad \dot{h}(x, u) \geq -\alpha(h(x))

工程上实现为 CBF-QP(控制屏障函数二次规划),在每个 1ms 控制周期内求解,保证安全集不变性 h(x)0h(x) \geq 0safety_flag = 0xFF 时,电机驱动单元立即切断力矩输出,不等待上层确认——这是物理层的最后防线。

Bumpless Transfer:平滑模态切换

当 Brain 下发新指令触发模态切换(Jump)时,若直接替换控制律,电机输出会发生阶跃冲击。Spine 执行 Bumpless Transfer 避免控制量突变:

utrans(t)=uold+ttjumpTtrans(unewuold),t[tjump,  tjump+Ttrans]u_{trans}(t) = u_{old} + \frac{t - t_{jump}}{T_{trans}} \cdot (u_{new} - u_{old}), \quad t \in [t_{jump},\; t_{jump} + T_{trans}]

TtransT_{trans} 通常取 3–5ms(3–5 个控制周期),足以消除电机电流冲击,对任务执行几乎无感。


接口 B:数字-物理界面(Spine ↔ Body)

连接对象:ESP Spine 层 ↔ 电机驱动单元 + 传感器

属性规范
频率~1 kHz,同步
总线协议EtherCAT(工业以太网实时总线)
时延要求≤1ms(硬实时,不可违反)
数据格式二进制定长帧

核心约束:迟到的正确答案等于错误答案。接口 B 的时延抖动(jitter)须小于 0.1ms;任何软件层超时直接触发硬件安全停机。

下行(Spine → Body):参考力矩帧

ControlFrame {
  timestamp_ns:  uint64    // 硬件时间戳,纳秒精度
  joint_index:   uint8[n]  // 关节索引
  tau_ref:       float32[n]// 参考力矩(Nm)
  safety_flag:   uint8     // 0x00=正常 / 0xFF=紧急停机
}

上行(Body → Spine):传感器观测帧

ObservationFrame {
  timestamp_ns: uint64
  q:            float32[n]   // 关节角度(rad)
  dq:           float32[n]   // 关节角速度(rad/s)
  tau_actual:   float32[n]   // 实际力矩(Nm)
  imu_accel:    float32[3]   // 线加速度(m/s²)
  imu_gyro:     float32[3]   // 角速度(rad/s)
  force_torque: float32[6]   // 末端六轴力/力矩
}

观测帧不经任何软件过滤直接回流 Spine,确保 EKF 状态估计和 CBF 安全监控看到的是真实物理状态。


三条核心原则

Brain 不碰电机:VLA 只定义目标和规约,Spine 计算执行。AI 幻觉无法越过接口 A 直接传播到物理层——Brain 发出的任何语义指令,必须先经过 Spine 的 Φ 投影过滤,再经接口 B 才能到达电机。

Spine 是唯一控制出口:所有物理指令必须经 Φ 投影(CBF-QP)过滤。无论 Brain 发出的语义指令在语义上多么合理,Spine 都以安全约束二次修正,保证 h(x)0h(x) \geq 0 在每个控制周期内成立。

Body 观测直通无过滤:ObservationFrame 不经任何软件处理直接回流 Spine,EKF 和 CBF 看到的是真实物理状态。软件过滤会引入延迟和失真,在高速接触动力学场景下直接导致控制发散。


算法实现

import json, threading, numpy as np, cvxpy as cp
from dataclasses import dataclass
from collections import deque

# ── 接口 A 报文结构 ───────────────────────────────────────────

@dataclass
class STLRule:
    id:       str
    spec:     str
    priority: str   # "CRITICAL" | "HIGH" | "MEDIUM"

@dataclass
class BrainCommand:
    """Brain → Spine 下行载荷(接口 A)"""
    seq_id:    int
    timestamp: float
    stl_rules: list[STLRule]
    mode:      str
    target:    np.ndarray   # 目标位姿 [x, y, z, qw, qx, qy, qz]

    @classmethod
    def from_json(cls, raw: str) -> "BrainCommand":
        d = json.loads(raw)
        rules = [STLRule(**r) for r in d.get("stl_rules", [])]
        target = np.array(d["etl_task"]["params"]["target_pose"])
        return cls(seq_id=d["seq_id"], timestamp=d["timestamp"],
                   stl_rules=rules, mode=d["etl_task"]["mode"],
                   target=target)

@dataclass
class SpineReport:
    """Spine → Brain 上行状态摘要(接口 A)"""
    seq_id:        int
    timestamp:     float
    trigger:       str        # "heartbeat" | "rho_warn" | "mode_change"
    mode:          str
    rho:           dict       # {rule_id: rho_value}
    safety_margin: float

    def to_json(self) -> str:
        return json.dumps({
            "seq_id": self.seq_id, "timestamp": self.timestamp,
            "trigger": self.trigger, "mode": self.mode,
            "rho": self.rho, "safety_margin_m": self.safety_margin,
        })


# ── 接口 A 通信管理器(ZOH + 事件驱动上报)────────────────────

class InterfaceA:
    def __init__(self, rho_warn: float = 0.5, heartbeat_s: float = 0.5):
        self._lock       = threading.Lock()
        self._last_cmd:  BrainCommand | None = None
        self.rho_warn    = rho_warn
        self.heartbeat   = heartbeat_s
        self._last_report_t = -float("inf")

    def receive(self, raw_json: str) -> None:
        """Brain 下发新指令(异步调用)"""
        with self._lock:
            self._last_cmd = BrainCommand.from_json(raw_json)

    def get_command(self) -> BrainCommand | None:
        """Spine 每毫秒读取(ZOH:返回最近一次指令)"""
        with self._lock:
            return self._last_cmd

    def should_report(self, t: float, rho_min: float) -> bool:
        if rho_min <= self.rho_warn:
            return True
        if t - self._last_report_t >= self.heartbeat:
            self._last_report_t = t
            return True
        return False


# ── 接口 B 帧结构 ─────────────────────────────────────────────

@dataclass
class ControlFrame:
    """Spine → Body,1kHz"""
    timestamp_ns: int
    tau_ref:      np.ndarray   # 关节参考力矩
    safety_flag:  int = 0x00   # 0xFF = 紧急停机

@dataclass
class ObservationFrame:
    """Body → Spine,1kHz"""
    timestamp_ns: int
    q:            np.ndarray   # 关节角度
    dq:           np.ndarray   # 关节角速度
    tau_actual:   np.ndarray   # 实际力矩
    imu_accel:    np.ndarray   # 线加速度
    imu_gyro:     np.ndarray   # 角速度
    force_torque: np.ndarray   # 末端六轴力/力矩


# ── Φ 投影:CBF-QP ────────────────────────────────────────────

class PhiProjection:
    """将 AI 意图投影到物理安全可行集(CBF 二次规划)"""

    def __init__(self, n_joints: int):
        self.n = n_joints

    def project(self,
                u_ref:  np.ndarray,   # Brain 参考指令
                A_cbf:  np.ndarray,   # CBF 约束矩阵
                b_cbf:  np.ndarray,   # CBF 约束向量
                tau_max: float = 50.0) -> np.ndarray:
        """
        min  ‖u - u_ref‖²
        s.t. A_cbf @ u + b_cbf ≥ 0    (安全集不变性)
             ‖u‖_∞ ≤ tau_max          (关节力矩上限)
        """
        u = cp.Variable(self.n)
        prob = cp.Problem(
            cp.Minimize(cp.sum_squares(u - u_ref)),
            [A_cbf @ u + b_cbf >= 0, cp.norm_inf(u) <= tau_max]
        )
        prob.solve(solver=cp.OSQP, warm_start=True, verbose=False)
        return u.value if prob.status in ("optimal", "optimal_inaccurate") \
               else np.zeros(self.n)


# ── Bumpless Transfer ─────────────────────────────────────────

class BumplessTransfer:
    """模态切换时的平滑过渡(消除控制量突变)"""

    def __init__(self, T_trans_ms: float = 5.0, dt_ms: float = 1.0):
        self.steps  = int(T_trans_ms / dt_ms)
        self._step  = 0
        self._u_old = None
        self._u_new = None

    def trigger(self, u_old: np.ndarray, u_new: np.ndarray) -> None:
        self._u_old, self._u_new, self._step = u_old, u_new, 0

    def get(self, u_nominal: np.ndarray) -> np.ndarray:
        if self._u_old is None or self._step >= self.steps:
            return u_nominal
        alpha = self._step / self.steps
        self._step += 1
        return (1 - alpha) * self._u_old + alpha * self._u_new

实时安全监控(CBF)Φ\Phi 投影中 CBF-QP 的安全约束构造)

安全约束验证(STL)stl_rules 字段的语义与鲁棒度计算)

跨频调度(ZOH 零阶保持与 Bumpless Transfer 的调度逻辑)

EvidencePack 协议(接口 A 下行流的形式化验证与证据留存)