编排
AgenticX 编排引擎。
编排
概述
AgenticX 提供两种互补的编排方式:
- Graph-based Workflows — 显式 DAG(节点与边),完全可控
- Flow Decorators — 基于 Python 装饰器的轻量流水线定义
基于图的工作流
python
1from agenticx.flow import Workflow, Node, Edge23workflow = Workflow(id="research-pipeline")45# Define nodes (each node is an agent task)6fetch = Node(id="fetch", agent=fetch_agent, task=fetch_task)7analyze = Node(id="analyze", agent=analyze_agent, task=analyze_task)8report = Node(id="report", agent=report_agent, task=report_task)910# Define edges (data flow)11workflow.add_edge(Edge(source="fetch", target="analyze"))12workflow.add_edge(Edge(source="analyze", target="report"))1314result = workflow.run()
条件路由
python
1from agenticx.flow import ConditionalEdge23def route_based_on_result(output):4 if output.confidence > 0.8:5 return "publish"6 else:7 return "review"89workflow.add_edge(10 ConditionalEdge(11 source="analyze",12 condition=route_based_on_result,13 targets={"publish": publish_node, "review": review_node}14 )15)
并行执行
python
1from agenticx.flow import ParallelNode23# Run multiple agents concurrently4parallel = ParallelNode(5 id="parallel-research",6 nodes=[fetch_news, fetch_papers, fetch_code],7 merge_strategy="concat"8)9workflow.add_node(parallel)
Flow 装饰器
对于更简单的流水线,可使用 @flow 装饰器体系:
python
1from agenticx.flow import flow, step23@flow4class ResearchPipeline:56 @step7 def fetch_data(self, query: str) -> str:8 return self.fetch_agent.run(query)910 @step11 def analyze(self, data: str) -> dict:12 return self.analyze_agent.run(data)1314 @step15 def generate_report(self, analysis: dict) -> str:16 return self.report_agent.run(analysis)1718pipeline = ResearchPipeline()19result = pipeline.run(query="Latest AI research")
执行计划
对于复杂的多步骤任务,可使用 execution plan:
python
1from agenticx.planner import ExecutionPlan, PlanStep23plan = ExecutionPlan(4 steps=[5 PlanStep(id="1", description="Gather requirements", agent=analyst),6 PlanStep(id="2", description="Design architecture", agent=architect, depends_on=["1"]),7 PlanStep(id="3", description="Implement features", agent=developer, depends_on=["2"]),8 PlanStep(id="4", description="Write tests", agent=tester, depends_on=["3"]),9 ]10)1112results = plan.execute()