安全公告 — 恶意 litellm 版本 1.82.7 与 1.82.8 已从 PyPI 移除(存在 API 密钥外泄风险)。请卸载、轮换已暴露凭据,并升级至安全版本(如 1.82.9+)。运行 pip show litellm 以确认。 PyPI · README

Flow 与工作流

AgenticX 中的 Flow 与工作流引擎。

Flow 与工作流引擎

AgenticX 提供两种互补的多步骤编排方式:装饰器驱动的 Flow 系统(进程内方法图),以及 配置驱动的 `WorkflowEngine`(基于图的执行)。


概述

LayerModuleBest for
Flowagenticx.flowPython classes with @start / @listen / @router
Execution planagenticx.flow.execution_planStaged goals, pause/resume, persistence
Graph engineagenticx.core.graphAsync nodes returning next node or End
Workflow engineagenticx.core.workflow_engineWorkflow / WorkflowGraph, concurrent branches

Flow 系统

`Flow` 基类

  • 泛型状态类型 `T`(dict 或 Pydantic model)。
  • 入口:`kickoff()`(同步)与 `kickoff_async()`

装饰器

DecoratorRole
`@start()`Unconditional entry point
`@listen(...)`Runs when trigger condition is satisfied
`@router(...)`Returns a string label for routing

示例:简单数据流水线

python
1from agenticx.flow import Flow, start, listen, router
2
3class DataPipeline(Flow[dict]):
4 @start()
5 def fetch_data(self):
6 return {"data": [1, 2, 3]}
7
8 @listen("fetch_data")
9 def process_data(self, result):
10 return {"processed": [x * 2 for x in result["data"]]}
11
12 @router("process_data")
13 def branch(self, result):
14 values = result.get("processed", [])
15 return "NONEMPTY" if values else "EMPTY"
16
17 @listen("NONEMPTY")
18 def on_success(self):
19 self.state["status"] = "ok"
20
21 @listen("EMPTY")
22 def on_empty(self):
23 self.state["status"] = "empty"
24
25flow = DataPipeline()
26flow.kickoff()

执行计划

`ExecutionPlan`

FieldDescription
`stages`List of ExecutionStage
`current_stage_index`Index of the active stage
`intervention_state`InterventionState for external control

`ExecutionStage` 与 `Subtask`

  • `ExecutionStage`:name、subtasksstatus
  • `Subtask`idnamequerystatus、result/error

核心能力

  • `ExecutionPlan.to_mermaid()`:返回 Mermaid 图字符串
  • `pause()` / `resume()` / `reset_node(subtask_id)`
  • `overall_progress`:已完成子任务占比

图引擎

Module: agenticx.core.graph

python
1from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext
2
3class StartNode(BaseNode):
4 async def run(self, ctx: GraphRunContext) -> ProcessNode | End[str]:
5 # Do work
6 return ProcessNode()
7
8class ProcessNode(BaseNode):
9 async def run(self, ctx: GraphRunContext) -> End[str]:
10 return End(result="done")
11
12graph = Graph(nodes=[StartNode, ProcessNode])
13result = await graph.run(StartNode())

WorkflowEngine

Module: agenticx.core.workflow_engine

  • `WorkflowEngine` 运行 `Workflow` 模型或 `WorkflowGraph`
  • 并发:入口节点与下游扇出通过 asyncio.gather 执行
  • 条件路由:边可编码 JSON condition_config
  • 可观测性ExecutionContext 持有 event_log

快速参考 import

python
1# Flow
2from agenticx.flow import (
3 Flow,
4 FlowState,
5 start,
6 listen,
7 router,
8 or_,
9 and_,
10)
11
12# Execution plan
13from agenticx.flow import (
14 ExecutionPlan,
15 ExecutionStage,
16 Subtask,
17 ExecutionPlanManager,
18)
19
20# Graph
21from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext
22
23# Workflow
24from agenticx.core.workflow_engine import WorkflowEngine, WorkflowGraph