ETL:具身任务语言
ETL(Embodied Task Language,具身任务语言) 是 EICPS 系统中任务层的领域特定语言(DSL)。它连接两侧:上游是 LLM 对自然语言工单的语义理解,下游是 HTN 规划器生成的原子操作序列,以及经 MCP 协议下发给机器人的执行包。
ETL 之于 HTN ≈ SQL 之于关系代数——ETL 是面向人与系统的声明式任务描述,HTN 是负责高效求解的计算引擎。
定位:PDDL 的具身继承者
传统机器人规划语言(PDDL/HDDL)设计于平坦的符号世界,在工业现场的具身场景中暴露三个结构性缺陷。ETL 在 PDDL 的形式化骨架上增加三层扩展:
| 维度 | PDDL/HDDL | ETL |
|---|---|---|
| 空间表示 | 离散位置谓词(at robot loc3) | SE(3) 位姿原生嵌入 |
| 时序安全 | 无内建时序约束,依赖外挂验证器 | STL 约束挂载于每个阶段, 值实时评估 |
| 可读性 | Lisp 括号语法,非工程人员难维护 | YAML 结构,字段语义透明 |
| 安全保证 | 规划后外挂验证 | 规划中 GeoPruner 三谱剪枝,输出先验合法 |
| 可执行性 | 需要中间件转换 | MCP 协议直接下发机器人执行 |
ETL 是首个面向电力检修场景的具身任务 DSL,在电压等级、安全净距、带电作业约束上有预编码的领域知识。
ETL 任务包结构
一个 ETL 任务包(Task Package)由四个顶层字段组成:
ETL Task Package
├── scene 场景参数(任务类型、电压等级、环境条件)
├── phases 执行阶段 DAG(含子任务树)
├── manifest 装备清单(机器人、工具、人员)
└── stl_active 全程激活的 STL 约束 ID 列表
scene:锁定任务的物理边界条件,所有阶段共享。
scene:
task_type: damper_replace
voltage_class: 500kV
safe_dist: 5.0 # 安全净距(m),由电压等级查 L1 表
is_live: false
wind_speed: 3.5
damper_count: 2
location: "华北500kV甲乙线12档"
phases:执行阶段 DAG,P1→P2→P3→P4 依赖链。每个阶段包含子任务列表,每个子任务携带 SE(3) 位姿、前提条件、效果和局部 STL 激活:
phases:
- id: DR-P1
name: 作业准备
safety_level: critical
depends_on: []
stl_constraints: [STL-001, STL-002]
children:
- id: DR-S1.1
name: 验电与接地
action_type: inspect
duration_s: 60
pose:
position: {x: 0.0, y: 0.0, z: 15.2}
orientation: {roll: 0, pitch: 0, yaw: 0}
preconditions: [at_site]
effects: [power_verified, ground_wire_installed]
- id: DR-P3
name: 更换作业
safety_level: critical
depends_on: [DR-P2]
stl_constraints: [STL-001, STL-002, STL-101, STL-102]
children:
- id: DR-S3.1
name: 拆除旧防振锤
action_type: manipulate
duration_s: 300
tools: [wrench_M10]
preconditions: [at_damper_position, wrench_grasped]
effects: [old_damper_removed]
- id: DR-S3.2
name: 安装新防振锤
action_type: manipulate
duration_s: 400
stl_active_local: [STL-101] # 仅紧固时激活力矩约束
preconditions: [old_damper_removed, new_damper_grasped]
effects: [new_damper_installed, bolt1_tight, bolt2_tight]
ST/MA 双层任务表示
每个子任务在 ETL 内部以双层方式表示:
ST 层(语义任务,Semantic Task):与平台无关,描述”做什么”——task_id、type、target。Prj167 的”接近防振锤位置”是一个 ST 层描述,不绑定任何具体机器人型号。
MA 层(物理动作,Motion Action):绑定具体执行平台,描述”怎么做”——SE(3) 位姿(pose)、几何约束(geo_req)、STL 约束引用(stl_con)。MA 层将 ST 层的语义意图锚定到物理流形 上的具体坐标。
这种双层设计使得同一个 ETL 语义任务可以适配不同的机器人平台(四足+机械臂、无人机、履带车),只需替换 MA 层的位姿和约束参数,ST 层不变。
编译链:从工单到执行
ETL 任务包在规划流水线中经历两次生成与一次执行,左侧编译链是图的核心:
① 自然语言工单:操作员输入非结构化的现场描述,如”华北500kV甲乙线12档,更换2个防振锤,带电作业”。
② LLM 前处理(非权威):输入工单文本 + KB 领域知识库(L1–L4 层),输出 ETL 草稿——参考任务图,供人工审核。特性:可理解语义、可补全缺省值,但不保证安全,是非权威输出。
③ 人工审核节点:6 个人机协同节点之一。人审核 ETL 草稿,确认任务图合理性,修正缺省值后提交。LLM 草稿经由虚线箭头(草稿)流向此节点,强调其非权威性。
④ HTN DFS 规划器 + GeoPruner(权威):以确定性深度优先搜索展开 HTN 任务网络,≤0.2ms 完成,输出 PlanResult.to_etl_graph()。规划前执行三谱联合剪枝:
过滤不满足几何可达性()、时间可行性()、安全距离()的算子分支,保证输出先验满足所有 STL 前提。HTN 的输出经粗实线”权威输出”箭头流向下一步。
⑤ ETL 正式包:人可读 + 机器可执行的任务图。包含 scene、phases(DAG)、manifest、stl_active 四个字段。不允许前端直接编辑——修改路径是重走规划流程,绕过 STL 约束会引入安全风险。
⑥ MCP 协议下发 → 执行监控:ETL 包经 MCP 协议下发到机器人执行。执行期由 STL-RHC 滚动时域监控实时评估约束:每 5s 计算激活 STL 约束的鲁棒度 ,三级响应:
ρ > ρ_warn → 继续执行,定时上报
0 < ρ ≤ ρ_warn → 预警:上报 Brain,降速
ρ ≤ 0 → 紧急停机
形式语义
ETL 任务包的形式化定义:
| 符号 | 含义 | 类型 |
|---|---|---|
| 场景参数集合 | task_type、voltage_class、safe_dist… | |
| 阶段有向无环图 | , | |
| 资源清单 | {robots, tools, personnel} | |
| STL 约束引用集 | ,对应 stl_specs.yaml 中的规格 |
安全语义:ETL 包合法当且仅当存在满足所有约束的执行轨迹:
是 STL 公式 在轨迹 上的鲁棒度, 保证约束满足。HTN 规划器的 GeoPruner 在展开时实施该语义,保证输出包先验合法。
三条核心原则
LLM 草稿供人审核:LLM 前处理的输出是非权威草稿,可能缺省、不保证安全。人审核节点是语义理解与安全规划之间的防火墙。
HTN 输出是唯一权威:GeoPruner 三谱剪枝()确保 HTN 输出先验合法,是系统对任务安全性的形式化承诺。
ETL 正式包禁止前端编辑:任何修改都必须重走规划流程。绕过 HTN 直接编辑 ETL 正式包等价于绕过 STL 约束,是不可接受的安全风险。
算法实现
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Optional
import json, yaml, re
# ── ETL 数据结构 ──────────────────────────────────────────────
@dataclass
class SE3Pose:
"""SE(3) 位姿:平移 + 旋转(欧拉角,单位:m / deg)"""
x: float = 0.0; y: float = 0.0; z: float = 0.0
roll: float = 0.0; pitch: float = 0.0; yaw: float = 0.0
@dataclass
class ETLSubtask:
id: str; name: str; action_type: str; duration_s: int
pose: Optional[SE3Pose] = None
tools: list[str] = field(default_factory=list)
preconditions: list[str] = field(default_factory=list)
effects: list[str] = field(default_factory=list)
stl_active_local: list[str] = field(default_factory=list)
@dataclass
class ETLPhase:
id: str; name: str; safety_level: str
depends_on: list[str]; estimated_duration_s: int
stl_constraints: list[str]
children: list[ETLSubtask] = field(default_factory=list)
@dataclass
class ETLPackage:
"""ETL 任务包:规划流水线的输出格式,MCP 下发的载体"""
task_id: str; scene: dict
phases: list[ETLPhase]; manifest: dict; stl_active: list[str]
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False, indent=2)
def to_yaml(self) -> str:
return yaml.dump(asdict(self), allow_unicode=True, sort_keys=False)
@property
def total_duration_s(self) -> int:
return sum(p.estimated_duration_s for p in self.phases)
# ── ETL 生成器(HTN 输出 → ETL 包)────────────────────────────
class ETLBuilder:
"""将 HTN DFS 规划结果转换为 ETL 任务包"""
def __init__(self, scene: dict, stl_specs: list[dict]):
self.scene = scene
self.stl_specs = {s["id"]: s for s in stl_specs}
def build(self, htn_operators: list[dict]) -> ETLPackage:
phases_map: dict[str, list] = {}
for op in htn_operators:
phases_map.setdefault(op.get("phase", "DR-P1"), []).append(op)
phases, prev = [], []
for phase_id in ["DR-P1", "DR-P2", "DR-P3", "DR-P4"]:
if phase_id not in phases_map:
continue
subtasks = [
ETLSubtask(
id=op["id"], name=op["name"],
action_type=op.get("action_type", "manipulate"),
duration_s=op.get("duration_s", 60),
pose=SE3Pose(**op["pose"]) if "pose" in op else None,
tools=op.get("tools", []),
preconditions=op.get("preconditions", []),
effects=op.get("effects", []),
) for op in phases_map[phase_id]
]
stl_map = {"DR-P1": ["STL-001","STL-002"],
"DR-P2": ["STL-001","STL-002"],
"DR-P3": ["STL-001","STL-002","STL-101","STL-102"],
"DR-P4": ["STL-003"]}
name_map = {"DR-P1": "作业准备", "DR-P2": "接近定位",
"DR-P3": "更换作业", "DR-P4": "收工记录"}
phases.append(ETLPhase(
id=phase_id, name=name_map.get(phase_id, phase_id),
safety_level="critical" if phase_id in ("DR-P1","DR-P3") else "high",
depends_on=list(prev),
estimated_duration_s=sum(s.duration_s for s in subtasks),
stl_constraints=stl_map.get(phase_id, []),
children=subtasks,
))
prev = [phase_id]
return ETLPackage(
task_id=f"{self.scene['task_type']}_{self.scene['voltage_class']}_001",
scene=self.scene, phases=phases,
manifest={"robots": [{"id":"R-001","type":"quadruped_with_arm","role":"主操作"}],
"tools": [{"id":"T-001","name":"力矩扳手M10","quantity":1}],
"personnel": [{"role":"作业负责人","count":1},{"role":"安全监护人","count":1}]},
stl_active=["STL-001","STL-002","STL-003","STL-004"],
)
# ── STL-RHC 执行监控 ───────────────────────────────────────────
class STLRHCMonitor:
"""ETL 任务执行期滚动时域监控:每 interval_s 秒评估 ρ 值"""
def __init__(self, stl_specs: list[dict], interval_s: int = 5):
self.specs = {s["id"]: s for s in stl_specs}
self.interval_s = interval_s
def evaluate(self, active_ids: list[str],
signals: dict[str, float]) -> dict:
results = {}
for sid in active_ids:
spec = self.specs.get(sid)
if not spec:
continue
rho = self._compute_rho(spec, signals)
warn = spec.get("rho_warn", 0.1)
results[sid] = {"rho": rho,
"status": "ok" if rho > warn
else "warn" if rho > 0
else "violated"}
statuses = [r["status"] for r in results.values()]
action = ("stop" if "violated" in statuses
else "warn" if "warn" in statuses
else "continue")
return {"constraints": results, "rhc_action": action}
def _compute_rho(self, spec: dict, signals: dict) -> float:
formula = spec.get("formula", "")
for sig_name, val in signals.items():
if sig_name in formula:
m = re.search(r"[><=]+\s*([\d.]+)", formula)
if m:
thr = float(m.group(1))
return val - thr if ">" in formula else thr - val
return 1.0
延伸阅读
- 实时安全监控(CBF) — GeoPruner 三谱剪枝()的安全谱理论
- 跨频调度 — ETL 阶段切换的 Flow-Jump 混合系统形式化
- 具身空间几何 — ETL 任务节点在 上的语义定位
- 接口协议 A/B — ETL 包经 MCP 下发的通信规范
- EvidencePack 协议 — 执行结果的 STL 验证记录与证据留存