ETL:具身任务语言

ETL(Embodied Task Language,具身任务语言) 是 EICPS 系统中任务层的领域特定语言(DSL)。它连接两侧:上游是 LLM 对自然语言工单的语义理解,下游是 HTN 规划器生成的原子操作序列,以及经 MCP 协议下发给机器人的执行包。

ETL 之于 HTN ≈ SQL 之于关系代数——ETL 是面向人与系统的声明式任务描述,HTN 是负责高效求解的计算引擎。

ETL 具身任务语言:结构、组件与执行流程

定位:PDDL 的具身继承者

传统机器人规划语言(PDDL/HDDL)设计于平坦的符号世界,在工业现场的具身场景中暴露三个结构性缺陷。ETL 在 PDDL 的形式化骨架上增加三层扩展:

ETL=PDDL核心+SE(3)位姿语义具身层+STL时序约束安全层+YAML语法工程层\text{ETL} = \text{PDDL}_{\text{核心}} + \underbrace{SE(3)_{\text{位姿语义}}}_{\text{具身层}} + \underbrace{\text{STL}_{\text{时序约束}}}_{\text{安全层}} + \underbrace{\text{YAML}_{\text{语法}}}_{\text{工程层}}

维度PDDL/HDDLETL
空间表示离散位置谓词(at robot loc3SE(3) 位姿原生嵌入
时序安全无内建时序约束,依赖外挂验证器STL 约束挂载于每个阶段,ρ\rho 值实时评估
可读性Lisp 括号语法,非工程人员难维护YAML 结构,字段语义透明
安全保证规划后外挂验证规划中 GeoPruner 三谱剪枝,输出先验合法
可执行性需要中间件转换MCP 协议直接下发机器人执行

ETL 是首个面向电力检修场景的具身任务 DSL,在电压等级、安全净距、带电作业约束上有预编码的领域知识。


ETL 任务包结构

一个 ETL 任务包(Task Package)由四个顶层字段组成:

ETL Task Package
├── scene          场景参数(任务类型、电压等级、环境条件)
├── phases         执行阶段 DAG(含子任务树)
├── manifest       装备清单(机器人、工具、人员)
└── stl_active     全程激活的 STL 约束 ID 列表

scene:锁定任务的物理边界条件,所有阶段共享。

scene:
  task_type:     damper_replace
  voltage_class: 500kV
  safe_dist:     5.0          # 安全净距(m),由电压等级查 L1 表
  is_live:       false
  wind_speed:    3.5
  damper_count:  2
  location:      "华北500kV甲乙线12档"

phases:执行阶段 DAG,P1→P2→P3→P4 依赖链。每个阶段包含子任务列表,每个子任务携带 SE(3) 位姿、前提条件、效果和局部 STL 激活:

phases:
  - id: DR-P1
    name: 作业准备
    safety_level: critical
    depends_on: []
    stl_constraints: [STL-001, STL-002]
    children:
      - id: DR-S1.1
        name: 验电与接地
        action_type: inspect
        duration_s: 60
        pose:
          position: {x: 0.0, y: 0.0, z: 15.2}
          orientation: {roll: 0, pitch: 0, yaw: 0}
        preconditions: [at_site]
        effects: [power_verified, ground_wire_installed]

  - id: DR-P3
    name: 更换作业
    safety_level: critical
    depends_on: [DR-P2]
    stl_constraints: [STL-001, STL-002, STL-101, STL-102]
    children:
      - id: DR-S3.1
        name: 拆除旧防振锤
        action_type: manipulate
        duration_s: 300
        tools: [wrench_M10]
        preconditions: [at_damper_position, wrench_grasped]
        effects: [old_damper_removed]
      - id: DR-S3.2
        name: 安装新防振锤
        action_type: manipulate
        duration_s: 400
        stl_active_local: [STL-101]   # 仅紧固时激活力矩约束
        preconditions: [old_damper_removed, new_damper_grasped]
        effects: [new_damper_installed, bolt1_tight, bolt2_tight]

ST/MA 双层任务表示

每个子任务在 ETL 内部以双层方式表示:

ST 层(语义任务,Semantic Task):与平台无关,描述”做什么”——task_idtypetarget。Prj167 的”接近防振锤位置”是一个 ST 层描述,不绑定任何具体机器人型号。

MA 层(物理动作,Motion Action):绑定具体执行平台,描述”怎么做”——SE(3) 位姿(pose)、几何约束(geo_req)、STL 约束引用(stl_con)。MA 层将 ST 层的语义意图锚定到物理流形 Mphy\mathcal{M}_{phy} 上的具体坐标。

这种双层设计使得同一个 ETL 语义任务可以适配不同的机器人平台(四足+机械臂、无人机、履带车),只需替换 MA 层的位姿和约束参数,ST 层不变。


编译链:从工单到执行

ETL 任务包在规划流水线中经历两次生成与一次执行,左侧编译链是图的核心:

① 自然语言工单:操作员输入非结构化的现场描述,如”华北500kV甲乙线12档,更换2个防振锤,带电作业”。

② LLM 前处理(非权威):输入工单文本 + KB 领域知识库(L1–L4 层),输出 ETL 草稿——参考任务图,供人工审核。特性:可理解语义、可补全缺省值,但不保证安全,是非权威输出。

③ 人工审核节点:6 个人机协同节点之一。人审核 ETL 草稿,确认任务图合理性,修正缺省值后提交。LLM 草稿经由虚线箭头(草稿)流向此节点,强调其非权威性。

④ HTN DFS 规划器 + GeoPruner(权威):以确定性深度优先搜索展开 HTN 任务网络,≤0.2ms 完成,输出 PlanResult.to_etl_graph()。规划前执行三谱联合剪枝

算子合法    ΣG    ΣT    ΣS\text{算子合法} \iff \Sigma_G \;\wedge\; \Sigma_T \;\wedge\; \Sigma_S

过滤不满足几何可达性(ΣG\Sigma_G)、时间可行性(ΣT\Sigma_T)、安全距离(ΣS\Sigma_S)的算子分支,保证输出先验满足所有 STL 前提。HTN 的输出经粗实线”权威输出”箭头流向下一步。

⑤ ETL 正式包:人可读 + 机器可执行的任务图。包含 scenephases(DAG)、manifeststl_active 四个字段。不允许前端直接编辑——修改路径是重走规划流程,绕过 STL 约束会引入安全风险。

⑥ MCP 协议下发 → 执行监控:ETL 包经 MCP 协议下发到机器人执行。执行期由 STL-RHC 滚动时域监控实时评估约束:每 5s 计算激活 STL 约束的鲁棒度 ρ\rho,三级响应:

ρ > ρ_warn  → 继续执行,定时上报
0 < ρ ≤ ρ_warn → 预警:上报 Brain,降速
ρ ≤ 0       → 紧急停机

形式语义

ETL 任务包的形式化定义:

ETL_Package=(S,  Φ,  R,  C)\text{ETL\_Package} = (\mathcal{S}, \; \Phi, \; \mathcal{R}, \; \mathcal{C})

符号含义类型
S\mathcal{S}场景参数集合task_typevoltage_classsafe_dist
Φ\Phi阶段有向无环图Φ={Pi}\Phi = \{P_i\}Pi=(id,subtasks,stl_refs)P_i = (\text{id}, \text{subtasks}, \text{stl\_refs})
R\mathcal{R}资源清单{robots, tools, personnel}
C\mathcal{C}STL 约束引用集{ck}\{c_k\},对应 stl_specs.yaml 中的规格

安全语义:ETL 包合法当且仅当存在满足所有约束的执行轨迹:

valid(pkg)    σExec(Φ):ckC,  ρ(ck,σ)>0\text{valid}(\text{pkg}) \iff \exists \sigma \in \text{Exec}(\Phi) : \forall c_k \in \mathcal{C},\; \rho(c_k, \sigma) > 0

ρ(ck,σ)\rho(c_k, \sigma) 是 STL 公式 ckc_k 在轨迹 σ\sigma 上的鲁棒度ρ>0\rho > 0 保证约束满足。HTN 规划器的 GeoPruner 在展开时实施该语义,保证输出包先验合法。


三条核心原则

LLM 草稿供人审核:LLM 前处理的输出是非权威草稿,可能缺省、不保证安全。人审核节点是语义理解与安全规划之间的防火墙。

HTN 输出是唯一权威:GeoPruner 三谱剪枝(ΣGΣTΣS\Sigma_G \wedge \Sigma_T \wedge \Sigma_S)确保 HTN 输出先验合法,是系统对任务安全性的形式化承诺。

ETL 正式包禁止前端编辑:任何修改都必须重走规划流程。绕过 HTN 直接编辑 ETL 正式包等价于绕过 STL 约束,是不可接受的安全风险。


算法实现

from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Optional
import json, yaml, re

# ── ETL 数据结构 ──────────────────────────────────────────────

@dataclass
class SE3Pose:
    """SE(3) 位姿:平移 + 旋转(欧拉角,单位:m / deg)"""
    x: float = 0.0; y: float = 0.0; z: float = 0.0
    roll: float = 0.0; pitch: float = 0.0; yaw: float = 0.0

@dataclass
class ETLSubtask:
    id: str; name: str; action_type: str; duration_s: int
    pose:             Optional[SE3Pose] = None
    tools:            list[str] = field(default_factory=list)
    preconditions:    list[str] = field(default_factory=list)
    effects:          list[str] = field(default_factory=list)
    stl_active_local: list[str] = field(default_factory=list)

@dataclass
class ETLPhase:
    id: str; name: str; safety_level: str
    depends_on: list[str]; estimated_duration_s: int
    stl_constraints: list[str]
    children: list[ETLSubtask] = field(default_factory=list)

@dataclass
class ETLPackage:
    """ETL 任务包:规划流水线的输出格式,MCP 下发的载体"""
    task_id: str; scene: dict
    phases: list[ETLPhase]; manifest: dict; stl_active: list[str]

    def to_json(self) -> str:
        return json.dumps(asdict(self), ensure_ascii=False, indent=2)

    def to_yaml(self) -> str:
        return yaml.dump(asdict(self), allow_unicode=True, sort_keys=False)

    @property
    def total_duration_s(self) -> int:
        return sum(p.estimated_duration_s for p in self.phases)


# ── ETL 生成器(HTN 输出 → ETL 包)────────────────────────────

class ETLBuilder:
    """将 HTN DFS 规划结果转换为 ETL 任务包"""

    def __init__(self, scene: dict, stl_specs: list[dict]):
        self.scene     = scene
        self.stl_specs = {s["id"]: s for s in stl_specs}

    def build(self, htn_operators: list[dict]) -> ETLPackage:
        phases_map: dict[str, list] = {}
        for op in htn_operators:
            phases_map.setdefault(op.get("phase", "DR-P1"), []).append(op)

        phases, prev = [], []
        for phase_id in ["DR-P1", "DR-P2", "DR-P3", "DR-P4"]:
            if phase_id not in phases_map:
                continue
            subtasks = [
                ETLSubtask(
                    id=op["id"], name=op["name"],
                    action_type=op.get("action_type", "manipulate"),
                    duration_s=op.get("duration_s", 60),
                    pose=SE3Pose(**op["pose"]) if "pose" in op else None,
                    tools=op.get("tools", []),
                    preconditions=op.get("preconditions", []),
                    effects=op.get("effects", []),
                ) for op in phases_map[phase_id]
            ]
            stl_map = {"DR-P1": ["STL-001","STL-002"],
                       "DR-P2": ["STL-001","STL-002"],
                       "DR-P3": ["STL-001","STL-002","STL-101","STL-102"],
                       "DR-P4": ["STL-003"]}
            name_map = {"DR-P1": "作业准备", "DR-P2": "接近定位",
                        "DR-P3": "更换作业", "DR-P4": "收工记录"}
            phases.append(ETLPhase(
                id=phase_id, name=name_map.get(phase_id, phase_id),
                safety_level="critical" if phase_id in ("DR-P1","DR-P3") else "high",
                depends_on=list(prev),
                estimated_duration_s=sum(s.duration_s for s in subtasks),
                stl_constraints=stl_map.get(phase_id, []),
                children=subtasks,
            ))
            prev = [phase_id]

        return ETLPackage(
            task_id=f"{self.scene['task_type']}_{self.scene['voltage_class']}_001",
            scene=self.scene, phases=phases,
            manifest={"robots": [{"id":"R-001","type":"quadruped_with_arm","role":"主操作"}],
                      "tools": [{"id":"T-001","name":"力矩扳手M10","quantity":1}],
                      "personnel": [{"role":"作业负责人","count":1},{"role":"安全监护人","count":1}]},
            stl_active=["STL-001","STL-002","STL-003","STL-004"],
        )


# ── STL-RHC 执行监控 ───────────────────────────────────────────

class STLRHCMonitor:
    """ETL 任务执行期滚动时域监控:每 interval_s 秒评估 ρ 值"""

    def __init__(self, stl_specs: list[dict], interval_s: int = 5):
        self.specs      = {s["id"]: s for s in stl_specs}
        self.interval_s = interval_s

    def evaluate(self, active_ids: list[str],
                 signals: dict[str, float]) -> dict:
        results = {}
        for sid in active_ids:
            spec = self.specs.get(sid)
            if not spec:
                continue
            rho  = self._compute_rho(spec, signals)
            warn = spec.get("rho_warn", 0.1)
            results[sid] = {"rho": rho,
                            "status": "ok" if rho > warn
                                      else "warn" if rho > 0
                                      else "violated"}
        statuses = [r["status"] for r in results.values()]
        action = ("stop" if "violated" in statuses
                  else "warn" if "warn" in statuses
                  else "continue")
        return {"constraints": results, "rhc_action": action}

    def _compute_rho(self, spec: dict, signals: dict) -> float:
        formula = spec.get("formula", "")
        for sig_name, val in signals.items():
            if sig_name in formula:
                m = re.search(r"[><=]+\s*([\d.]+)", formula)
                if m:
                    thr = float(m.group(1))
                    return val - thr if ">" in formula else thr - val
        return 1.0

延伸阅读