Flow 与工作流
AgenticX 中的 Flow 与工作流引擎。
Flow 与工作流引擎
AgenticX 提供两种互补的多步骤编排方式:装饰器驱动的 Flow 系统(进程内方法图),以及 配置驱动的 `WorkflowEngine`(基于图的执行)。
概述
| Layer | Module | Best for |
|---|---|---|
| Flow | agenticx.flow | Python classes with @start / @listen / @router |
| Execution plan | agenticx.flow.execution_plan | Staged goals, pause/resume, persistence |
| Graph engine | agenticx.core.graph | Async nodes returning next node or End |
| Workflow engine | agenticx.core.workflow_engine | Workflow / WorkflowGraph, concurrent branches |
Flow 系统
`Flow` 基类
- 泛型状态类型 `T`(dict 或 Pydantic model)。
- 入口:`kickoff()`(同步)与 `kickoff_async()`。
装饰器
| Decorator | Role |
|---|---|
| `@start()` | Unconditional entry point |
| `@listen(...)` | Runs when trigger condition is satisfied |
| `@router(...)` | Returns a string label for routing |
示例:简单数据流水线
python
1from agenticx.flow import Flow, start, listen, router23class DataPipeline(Flow[dict]):4 @start()5 def fetch_data(self):6 return {"data": [1, 2, 3]}78 @listen("fetch_data")9 def process_data(self, result):10 return {"processed": [x * 2 for x in result["data"]]}1112 @router("process_data")13 def branch(self, result):14 values = result.get("processed", [])15 return "NONEMPTY" if values else "EMPTY"1617 @listen("NONEMPTY")18 def on_success(self):19 self.state["status"] = "ok"2021 @listen("EMPTY")22 def on_empty(self):23 self.state["status"] = "empty"2425flow = DataPipeline()26flow.kickoff()
执行计划
`ExecutionPlan`
| Field | Description |
|---|---|
| `stages` | List of ExecutionStage |
| `current_stage_index` | Index of the active stage |
| `intervention_state` | InterventionState for external control |
`ExecutionStage` 与 `Subtask`
- `ExecutionStage`:name、
subtasks、status - `Subtask`:
id、name、query、status、result/error
核心能力
- `ExecutionPlan.to_mermaid()`:返回 Mermaid 图字符串
- `pause()` / `resume()` / `reset_node(subtask_id)`
- `overall_progress`:已完成子任务占比
图引擎
Module: agenticx.core.graph
python
1from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext23class StartNode(BaseNode):4 async def run(self, ctx: GraphRunContext) -> ProcessNode | End[str]:5 # Do work6 return ProcessNode()78class ProcessNode(BaseNode):9 async def run(self, ctx: GraphRunContext) -> End[str]:10 return End(result="done")1112graph = Graph(nodes=[StartNode, ProcessNode])13result = await graph.run(StartNode())
WorkflowEngine
Module: agenticx.core.workflow_engine
- `WorkflowEngine` 运行 `Workflow` 模型或 `WorkflowGraph`
- 并发:入口节点与下游扇出通过
asyncio.gather执行 - 条件路由:边可编码 JSON
condition_config - 可观测性:
ExecutionContext持有event_log
快速参考 import
python
1# Flow2from agenticx.flow import (3 Flow,4 FlowState,5 start,6 listen,7 router,8 or_,9 and_,10)1112# Execution plan13from agenticx.flow import (14 ExecutionPlan,15 ExecutionStage,16 Subtask,17 ExecutionPlanManager,18)1920# Graph21from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext2223# Workflow24from agenticx.core.workflow_engine import WorkflowEngine, WorkflowGraph