Flow & Workflow
Flow and workflow engine in AgenticX.
Flow & Workflow Engine
AgenticX exposes two complementary ways to orchestrate multi-step logic: a decorator-driven Flow system for in-process method graphs, and a configuration-driven `WorkflowEngine` for graph-based execution.
Overview
| Layer | Module | Best for |
|---|---|---|
| Flow | agenticx.flow | Python classes with @start / @listen / @router |
| Execution plan | agenticx.flow.execution_plan | Staged goals, pause/resume, persistence |
| Graph engine | agenticx.core.graph | Async nodes returning next node or End |
| Workflow engine | agenticx.core.workflow_engine | Workflow / WorkflowGraph, concurrent branches |
Flow system
`Flow` base class
- Generic over state type `T` (dict or Pydantic model).
- Entry points: `kickoff()` (sync) and `kickoff_async()`.
Decorators
| Decorator | Role |
|---|---|
| `@start()` | Unconditional entry point |
| `@listen(...)` | Runs when trigger condition is satisfied |
| `@router(...)` | Returns a string label for routing |
Example: simple data pipeline
python
1from agenticx.flow import Flow, start, listen, router23class DataPipeline(Flow[dict]):4 @start()5 def fetch_data(self):6 return {"data": [1, 2, 3]}78 @listen("fetch_data")9 def process_data(self, result):10 return {"processed": [x * 2 for x in result["data"]]}1112 @router("process_data")13 def branch(self, result):14 values = result.get("processed", [])15 return "NONEMPTY" if values else "EMPTY"1617 @listen("NONEMPTY")18 def on_success(self):19 self.state["status"] = "ok"2021 @listen("EMPTY")22 def on_empty(self):23 self.state["status"] = "empty"2425flow = DataPipeline()26flow.kickoff()
Execution plan
`ExecutionPlan`
| Field | Description |
|---|---|
| `stages` | List of ExecutionStage |
| `current_stage_index` | Index of the active stage |
| `intervention_state` | InterventionState for external control |
`ExecutionStage` and `Subtask`
- `ExecutionStage`: name,
subtasks,status - `Subtask`:
id,name,query,status, result/error
Core capabilities
- `ExecutionPlan.to_mermaid()`: returns a Mermaid diagram string
- `pause()` / `resume()` / `reset_node(subtask_id)`
- `overall_progress`: fraction of subtasks completed
Graph engine
Module: agenticx.core.graph
python
1from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext23class StartNode(BaseNode):4 async def run(self, ctx: GraphRunContext) -> ProcessNode | End[str]:5 # Do work6 return ProcessNode()78class ProcessNode(BaseNode):9 async def run(self, ctx: GraphRunContext) -> End[str]:10 return End(result="done")1112graph = Graph(nodes=[StartNode, ProcessNode])13result = await graph.run(StartNode())
WorkflowEngine
Module: agenticx.core.workflow_engine
- `WorkflowEngine` runs a `Workflow` model or `WorkflowGraph`
- Concurrency: entry nodes and downstream fan-out run via
asyncio.gather - Condition routing: edges may encode JSON
condition_config - Observability:
ExecutionContextholdsevent_log
Quick reference imports
python
1# Flow2from agenticx.flow import (3 Flow,4 FlowState,5 start,6 listen,7 router,8 or_,9 and_,10)1112# Execution plan13from agenticx.flow import (14 ExecutionPlan,15 ExecutionStage,16 Subtask,17 ExecutionPlanManager,18)1920# Graph21from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext2223# Workflow24from agenticx.core.workflow_engine import WorkflowEngine, WorkflowGraph