Security advisory — Malicious litellm versions 1.82.7 and 1.82.8 were removed from PyPI (potential API key exfiltration). Uninstall them, rotate exposed credentials, and upgrade to a safe release (e.g. 1.82.9+ per upstream). Run pip show litellm to verify. PyPI · README

Flow & Workflow

Flow and workflow engine in AgenticX.

Flow & Workflow Engine

AgenticX exposes two complementary ways to orchestrate multi-step logic: a decorator-driven Flow system for in-process method graphs, and a configuration-driven `WorkflowEngine` for graph-based execution.


Overview

LayerModuleBest for
Flowagenticx.flowPython classes with @start / @listen / @router
Execution planagenticx.flow.execution_planStaged goals, pause/resume, persistence
Graph engineagenticx.core.graphAsync nodes returning next node or End
Workflow engineagenticx.core.workflow_engineWorkflow / WorkflowGraph, concurrent branches

Flow system

`Flow` base class

  • Generic over state type `T` (dict or Pydantic model).
  • Entry points: `kickoff()` (sync) and `kickoff_async()`.

Decorators

DecoratorRole
`@start()`Unconditional entry point
`@listen(...)`Runs when trigger condition is satisfied
`@router(...)`Returns a string label for routing

Example: simple data pipeline

python
1from agenticx.flow import Flow, start, listen, router
2
3class DataPipeline(Flow[dict]):
4 @start()
5 def fetch_data(self):
6 return {"data": [1, 2, 3]}
7
8 @listen("fetch_data")
9 def process_data(self, result):
10 return {"processed": [x * 2 for x in result["data"]]}
11
12 @router("process_data")
13 def branch(self, result):
14 values = result.get("processed", [])
15 return "NONEMPTY" if values else "EMPTY"
16
17 @listen("NONEMPTY")
18 def on_success(self):
19 self.state["status"] = "ok"
20
21 @listen("EMPTY")
22 def on_empty(self):
23 self.state["status"] = "empty"
24
25flow = DataPipeline()
26flow.kickoff()

Execution plan

`ExecutionPlan`

FieldDescription
`stages`List of ExecutionStage
`current_stage_index`Index of the active stage
`intervention_state`InterventionState for external control

`ExecutionStage` and `Subtask`

  • `ExecutionStage`: name, subtasks, status
  • `Subtask`: id, name, query, status, result/error

Core capabilities

  • `ExecutionPlan.to_mermaid()`: returns a Mermaid diagram string
  • `pause()` / `resume()` / `reset_node(subtask_id)`
  • `overall_progress`: fraction of subtasks completed

Graph engine

Module: agenticx.core.graph

python
1from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext
2
3class StartNode(BaseNode):
4 async def run(self, ctx: GraphRunContext) -> ProcessNode | End[str]:
5 # Do work
6 return ProcessNode()
7
8class ProcessNode(BaseNode):
9 async def run(self, ctx: GraphRunContext) -> End[str]:
10 return End(result="done")
11
12graph = Graph(nodes=[StartNode, ProcessNode])
13result = await graph.run(StartNode())

WorkflowEngine

Module: agenticx.core.workflow_engine

  • `WorkflowEngine` runs a `Workflow` model or `WorkflowGraph`
  • Concurrency: entry nodes and downstream fan-out run via asyncio.gather
  • Condition routing: edges may encode JSON condition_config
  • Observability: ExecutionContext holds event_log

Quick reference imports

python
1# Flow
2from agenticx.flow import (
3 Flow,
4 FlowState,
5 start,
6 listen,
7 router,
8 or_,
9 and_,
10)
11
12# Execution plan
13from agenticx.flow import (
14 ExecutionPlan,
15 ExecutionStage,
16 Subtask,
17 ExecutionPlanManager,
18)
19
20# Graph
21from agenticx.core.graph import Graph, BaseNode, End, GraphRunContext
22
23# Workflow
24from agenticx.core.workflow_engine import WorkflowEngine, WorkflowGraph