This project was built entirely with Claude — the only human input is the specification documents in
spec/. And actually claude wrote those, I just gave suggestions. Having been retired for a while, when I heard about Claude, I needed to see how much of my previous career had just been eliminated (or at least made possible in a very small fraction of the time). This is a substantial, fully functional platform written exclusively through AI-assisted development, although it should only be used as an example of what beginners can do with AI coding assistance. This is still a work in progress and goes many ways as I say "I want to try this or that". In the development of this, most of my time has been watching youTube channels. If I understood Claude when I started, this project would be a few days at the most
AgentFlow is a platform for defining and executing distributed workflows. You describe what should happen in a simple language called AFL (Agent Flow Language), and AgentFlow handles the execution, retries, monitoring, and scaling.
You don't need to be a developer to use AgentFlow — if you can fill in a form, you can run workflows from the dashboard.
| I want to... | Start here |
|---|---|
| Run workflows from the web UI | Beginner's Guide |
| Set up a local server quickly | Quick Start (below) |
| Write my own workflows in AFL | AFL Tutorial |
| Build handlers in Python | Agent SDK |
| Build agents in other languages | Agent Libraries |
| Deploy to a cluster | Deployment Guide |
| Understand the architecture | Architecture |
| Contribute to AgentFlow | Full Technical Reference |
git clone https://github.com/rlemke/agentflow.git
cd agentflow
# Start everything: MongoDB + Dashboard + Runner + Sample Agent
docker compose up
# In another terminal, seed example workflows
docker compose run seedOpen http://localhost:8080 — that's the dashboard. Click Workflows to see what's available, then click New to run one.
| Service | URL | Description |
|---|---|---|
| Dashboard | http://localhost:8080 | Web UI for running and monitoring workflows |
| MongoDB | localhost:27017 | Database (managed by Docker) |
| Runner | (internal) | Processes workflow tasks automatically |
docker compose down # stop services
docker compose down -v # stop and remove datapython3 -m venv .venv && source .venv/bin/activate
pip install -e ".[dev,test,dashboard,mcp,mongodb]"
cp .env.example .env # edit MongoDB connection string
scripts/seed-examples
python -m afl.dashboard --log-format textOpen http://localhost:8080.
The dashboard is where you run workflows, monitor progress, and troubleshoot issues.
Running a workflow:
- Click Workflows in the sidebar
- Click New
- Select a workflow, fill in the parameters, click Run
- Watch it execute on the detail page — steps, logs, and progress update automatically
Finding things:
- Use Cmd+K (or click the search bar) to find any workflow, handler, or server by name
- Running / Completed / Failed tabs filter the workflow list
- Click any step to see its parameters, return values, logs, and execution duration
Key pages: Workflows, Handlers (registered event facet code), Servers (runner health), Fleet (bird's-eye view), Steps (individual step detail with logs).
AFL is a simple language for describing workflows. Here's a taste:
namespace myapp {
/** Fetches weather data for a city. */
event facet GetWeather(city: String) => (temperature: Long, conditions: String)
/** Gets weather for two cities and picks the warmer one. */
workflow CompareWeather(city_a: String, city_b: String) => (warmer: String) andThen {
weather_a = GetWeather(city = $.city_a)
weather_b = GetWeather(city = $.city_b)
yield CompareWeather(warmer = weather_a.temperature)
}
}
event facet— a step that needs a handler (your code) to do the actual workworkflow— the entry point that chains steps together$— the workflow's input parametersstep.field— output from a previous step
You write the workflow logic in AFL. A Python handler does the real work (API calls, data processing, etc.). AgentFlow connects them.
To learn more: AFL Tutorial | Language Reference | Examples
AFL workflows are designed to be shared and composed — just like importing a library in a regular programming language. Teams publish their facets, schemas, and workflows as namespaces that other teams can use in their own workflows.
namespace analytics.reports {
use data.warehouse // import another team's data facets
use ml.predictions // import the ML team's prediction facets
workflow MonthlyReport(month: String) => (report_path: String) andThen {
// Use the data team's extraction facet — you didn't write it, just call it
raw = ExtractSalesData(period = $.month)
// Use the ML team's forecasting facet
forecast = PredictNextMonth(history = raw.data)
// Your team's rendering step
report = RenderReport(sales = raw.data, forecast = forecast.prediction)
yield MonthlyReport(report_path = report.output_path)
}
}
How sharing works:
- Teams publish their AFL namespaces to MongoDB via
scripts/publish mylib.afl - Other teams import published namespaces with
use team.namespace - The compiler resolves and validates all cross-team references at compile time
- Handlers are registered independently — teams deploy and update their own handlers without affecting other teams' workflows
This means a domain expert can build a workflow by composing facets from across the organization — data engineering, ML, visualization, notification — without needing to know how any of them are implemented. It's the same idea as pip install or npm install, but for workflow steps.
AgentFlow doesn't run workflows on a single machine and hope for the best. It runs on a cluster of runner servers backed by MongoDB, designed for workloads that take minutes, hours, or days.
How it works:
- When a workflow reaches a step that needs work (an event facet), the runtime creates a task in MongoDB
- Any available runner server in the cluster picks up the task, executes the handler, and writes the result back
- The workflow automatically advances to the next step — no single machine needs to stay alive the whole time
Why this matters:
| Capability | How AgentFlow handles it |
|---|---|
| Long-running jobs | A step can take hours (e.g., importing geographic data, training a model). If a runner crashes or times out, the task is automatically reset to pending and another runner picks it up. |
| Scalability | Add more runner servers to handle more tasks in parallel. Each runner independently polls MongoDB for work — no central coordinator needed. |
| Rolling updates | Update handler code on runners one at a time with scripts/rolling-deploy. Running tasks finish on the old code; new tasks pick up the new code. No downtime. |
| Fault tolerance | If a server goes down, its orphaned tasks are automatically detected and reassigned. Workflows resume from exactly where they left off. |
| Monitoring | The dashboard shows every runner's health, active tasks, step logs, and execution duration in real time. |
A local Docker setup is great for development, but production workflows run on a cluster. See the Deployment Guide for setting up multiple runners across machines.
Everything below is for developers who want to build handlers, extend AgentFlow, or understand the internals.
# Create virtual environment
python3 -m venv .venv
source .venv/bin/activate
# Install the package (includes lark dependency)
pip install -e .
# For development (adds pytest, ruff, mypy, pre-commit)
pip install -e ".[dev]"
# For running tests with mongomock
pip install -e ".[test]"
# For full stack (dashboard + MCP + MongoDB)
pip install -e ".[dev,test,dashboard,mcp,mongodb]"Dependency groups (defined in pyproject.toml):
| Group | Includes |
|---|---|
| (base) | lark |
dev |
pytest, pytest-cov, ruff, mypy, pre-commit |
test |
pytest, pytest-cov, mongomock |
mongodb |
pymongo |
dashboard |
fastapi, uvicorn, jinja2 |
mcp |
mcp |
pytest tests/ -v # all tests
pytest tests/ --cov=afl --cov-report=term-missing # with coverage
pytest tests/test_parser.py::TestWorkflows -v # specific test
pytest tests/runtime/test_mongo_store.py --mongodb -v # real MongoDB
pytest tests/dashboard/ -v # dashboard testsfrom afl import parse, AFLParser, ParseError
source = """
facet User(name: String, email: String)
workflow SendEmail(to: String, body: String) => (status: String) andThen {
user = User(name = $.to, email = $.to)
result = EmailService(recipient = user.email, content = $.body)
yield SendEmail(status = result.status)
}
"""
ast = parse(source)
for workflow in ast.workflows:
print(f"Workflow: {workflow.sig.name}")
for param in workflow.sig.params:
print(f" Param: {param.name}: {param.type.name}")from afl import parse, emit_json, emit_dict
ast = parse("facet User(name: String)")
json_str = emit_json(ast)
data = emit_dict(ast)
# Compact output without locations
json_str = emit_json(ast, include_locations=False, indent=None)afl input.afl # parse and emit JSON
afl input.afl -o output.json # output to file
afl input.afl --check # syntax check only
afl input.afl --compact --no-locations # compact JSON
echo 'facet Test()' | afl # parse from stdinfrom afl import parse, emit_dict
from afl.runtime import Evaluator, MemoryStore, Telemetry, ExecutionStatus
from afl.runtime.agent_poller import AgentPoller, AgentPollerConfig
# Compile AFL
source = """
namespace demo {
event facet AddOne(input: Long) => (output: Long)
}
workflow Increment(x: Long) => (result: Long) andThen {
step = demo.AddOne(input = $.x)
yield Increment(result = step.output)
}
"""
ast = parse(source)
compiled = emit_dict(ast)
workflow_ast = compiled["workflows"][0]
program_ast = compiled
# Execute — pauses at event facet
store = MemoryStore()
evaluator = Evaluator(persistence=store, telemetry=Telemetry(enabled=False))
result = evaluator.execute(workflow_ast, inputs={"x": 41}, program_ast=program_ast)
# result.status == PAUSED (blocked at AddOne)
# Agent processes the event
def addone_handler(payload: dict) -> dict:
return {"output": payload["input"] + 1}
poller = AgentPoller(
persistence=store, evaluator=evaluator,
config=AgentPollerConfig(service_name="demo-agent"),
)
poller.register("demo.AddOne", addone_handler)
poller.cache_workflow_ast(result.workflow_id, workflow_ast)
poller.poll_once()
# Resume to completion
final = evaluator.resume(result.workflow_id, workflow_ast, program_ast)
assert final.outputs["result"] == 42 # 41 + 1python -m afl.dashboard # port 8080
python -m afl.dashboard --port 9000 --reload # dev modepython -m afl.runtime.runner # default
python -m afl.runtime.runner --topics TopicA --max-concurrent 10 # customThe MCP server exposes AFL compiler and runtime as tools for LLM agents:
python -m afl.mcp # stdio transportTools: afl_compile, afl_validate, afl_execute_workflow, afl_continue_step, afl_resume_workflow, afl_manage_runner
Resources: afl://runners, afl://runners/{id}, afl://steps/{id}, afl://flows, afl://servers, afl://tasks
AgentFlow agents can be built in any language. The agents/ directory has libraries for:
| Language | Directory | Build |
|---|---|---|
| Python | Built into afl.runtime |
pip install -e . |
| Scala | agents/scala/afl-agent/ |
sbt compile |
| Go | agents/go/afl-agent/ |
go build ./... |
| TypeScript | agents/typescript/afl-agent/ |
npm install && npm run build |
| Java | agents/java/afl-agent/ |
mvn compile |
Any language with a MongoDB driver can implement an agent. See agents/protocol/constants.json for the complete protocol specification.
Starting a new agent in a separate repo:
cp agents/templates/CLAUDE.md /path/to/my-agent/CLAUDE.md
cp agents/protocol/constants.json /path/to/my-agent/constants.jsonscripts/compile input.afl -o output.json # compile AFL
scripts/publish input.afl # compile + publish to MongoDB
scripts/run-workflow # interactive workflow execution
scripts/start-runner --example osm-geocoder # start runner
scripts/stop-runners # stop all runners
scripts/drain-runners # stop + reset tasks to pending
scripts/list-runners # show runner fleet
scripts/db-stats # database statistics
scripts/postgis-vacuum # PostGIS maintenance
scripts/postgis-vacuum-status # check vacuum progressAll scripts support --help.
See examples/README.md for a complete overview of all 15+ examples with feature matrices.
| Example | Highlights |
|---|---|
examples/osm-geocoder/ |
Full-scale: 42 AFL files, 16 handler categories, PostGIS, pgRouting |
examples/hiv-drug-resistance/ |
Bioinformatics: QC branching, error recovery, batch processing |
examples/noaa-weather/ |
Real data: AWS S3, climate analysis, linear regression |
examples/devops-deploy/ |
Conditional branching, prompt/script blocks, mixins |
examples/research-agent/ |
LLM integration: 8 prompt-block facets, Claude API |
examples/aws-lambda/ |
Real boto3: LocalStack, Step Functions, blue-green deploy |
examples/jenkins/ |
CI/CD: mixin composition, 4 pipeline workflows |
examples/genomics/ |
Bioinformatics: foreach fan-out, joint genotyping |
The spec/ directory is the authoritative reference:
| Document | What It Covers |
|---|---|
| spec/10_language.md | AFL syntax — EBNF grammar, all language constructs |
| spec/30_runtime.md | Execution semantics — iteration model, determinism |
| spec/40_database.md | MongoDB schema — collections, indexes, atomic commits |
| spec/50_event_system.md | Event/agent protocol — lifecycle, dispatch, task queue |
| spec/60_agent_sdk.md | Building agents — processing event facets |
Supporting specs: overview, AST semantics, validation, compiler, state system, LLM integration, examples, tests
String, Int, Long, Boolean, Json, [String] (arrays), [[Int]] (nested arrays), schema types
facet Name(param: Type) # data structure
event facet Name(param: Type) => (ret: Type) # triggers handler
workflow Name(param: Type) => (ret: Type) andThen { ... } # entry point
schema Name { field: Type } # typed structure
namespace ns.name { ... } # grouping
implicit name = Call(...) # defaults
facet Job(x: String) with Retry(max = 3) with Timeout(seconds = 60) # mixins
andThen foreach item in $.items { ... } # iteration
andThen when { case cond => { ... } case _ => { ... } } # branching
catch { ... } # error recovery
prompt { system "..." template "..." model "..." } # LLM
script python "..." # inline code
$.fieldName (input parameters), stepName.outputField (step outputs), step.result.nested (nested access)
- Python 3.11+
- lark >= 1.1.0