Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Steps are the fundamental unit of durability in Polos. Each step’s execution is persisted, enabling workflows to resume from the last completed step after failures.
What is a step?
A step is a durable operation within a workflow. When a step completes, its output is saved to the database. If the workflow crashes, completed steps return their cached results on replay - no re-execution, no duplicate side effects.
from polos import workflow, WorkflowContext
@workflow
async def example_workflow(ctx: WorkflowContext, input: ExampleInput):
# Step 1: Fetch data (cached on replay)
data = await ctx.step.run("fetch", fetch_from_api, input.url)
# Step 2: Process data (cached on replay)
processed = await ctx.step.run("process", transform_data, data)
# Step 3: Save result (executes only once)
await ctx.step.run("save", save_to_db, processed)
return {"status": "completed"}
On failure after step 2:
Replay:
Step 1 ("fetch") → Returns cached result (no API call)
Step 2 ("process") → Returns cached result (no re-processing)
Step 3 ("save") → Executes for the first time
Step execution model
Unlike workflows, which are scheduled by the orchestrator and assigned to workers, steps execute directly on the same worker running the workflow. This means:
- ✅ Lower overhead - No scheduling delay
- ✅ Same execution context - Access to workflow variables
- ✅ Durability guaranteed - Output is persisted before continuing
Steps are not queued or orchestrated — they run inline with the workflow code.
Core step methods
step.run()
Run any Python function as a durable step:
@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
# Run async function
order = await ctx.step.run(
"validate_order",
validate_order,
input.order_id
)
# Run sync function (automatically wrapped)
receipt = await ctx.step.run(
"generate_receipt",
generate_pdf, # Sync function
order
)
return receipt
With retry configuration:
result = await ctx.step.run(
"api_call",
call_external_api,
url,
max_retries=5,
base_delay=2.0,
max_delay=30.0
)
What should be a step:
- ✅ External API calls
- ✅ Database operations
- ✅ File I/O
- ✅ Non-deterministic operations (
random(), datetime.now())
- ✅ Any operation that might fail
What should NOT be a step:
- ❌ Pure logic (if statements, loops)
- ❌ Variable assignments
- ❌ String manipulation
step.invoke()
Start a child workflow without waiting for it to complete:
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
# Start child workflow
handle = await ctx.step.invoke(
"start_child",
child_workflow,
{"data": input.data}
)
# Continue immediately (don't wait for child to complete)
print(f"Started workflow: {handle.id}")
return {"child_id": handle.id}
step.invoke_and_wait()
Start a child workflow and suspend until it completes:
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
# Call child and wait for result
result = await ctx.step.invoke_and_wait(
"call_child",
child_workflow,
{"data": input.data}
)
# Parent resumes here with child's result
return {"child_result": result}
Worker behavior: The worker suspends the parent workflow (no compute consumed) until the child completes.
step.batch_invoke()
Start multiple workflows in parallel without waiting:
from polos import BatchWorkflowInput
@workflow
async def parallel_tasks(ctx: WorkflowContext, input: ParallelInput):
# Start multiple workflows
handles = await ctx.step.batch_invoke(
"start_batch", [
BatchWorkflowInput(id="task_workflow", payload={"task": "A"}),
BatchWorkflowInput(id="task_workflow", payload={"task": "B"}),
BatchWorkflowInput(id="log_workflow", payload={"input": input})
]
)
return {"started": len(handles)}
step.batch_invoke_and_wait()
Start multiple workflows in parallel and wait for all to complete:
@workflow
async def parallel_and_wait(ctx: WorkflowContext, input: ParallelInput):
# Start batch and wait for all
results = await ctx.step.batch_invoke_and_wait(
"batch_process", [
BatchWorkflowInput(id="processor", payload={"item": "A"}),
BatchWorkflowInput(id="processor", payload={"item": "B"}),
BatchWorkflowInput(id="log_workflow", payload={"input": input})
]
)
# Worker suspends until all complete
for result in results:
print(f"Result: {result.result}")
return results
step.agent_invoke()
Start an agent execution without waiting for it to complete:
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
# Start agent execution
handle = await ctx.step.agent_invoke(
"research_agent",
research_agent.with_input(input.query)
)
# Continue immediately (don't wait)
print(f"Started agent execution: {handle.id}")
return {"agent_execution_id": handle.id}
step.agent_invoke_and_wait()
Start an agent execution and suspend until it completes:
@workflow
async def review_workflow(ctx: WorkflowContext, input: ReviewInput):
# Call agent and wait for result
response = await ctx.step.agent_invoke_and_wait(
"grammar_review_agent_invoke",
grammar_review_agent.with_input(input.text)
)
# Parent resumes here with agent's response
return {"review": response.result}
Worker behavior: The worker suspends the parent workflow (no compute consumed) until the agent completes.
step.batch_agent_invoke()
Start multiple agent executions in parallel without waiting:
@workflow
async def parallel_agents(ctx: WorkflowContext, input: ParallelInput):
# Start multiple agents
handles = await ctx.step.batch_agent_invoke(
"start_batch_agents",
[
grammar_review_agent.with_input(input.text),
tone_review_agent.with_input(input.text),
accuracy_review_agent.with_input(input.text),
]
)
return {"started": len(handles)}
step.batch_agent_invoke_and_wait()
Start multiple agent executions in parallel and wait for all to complete:
@workflow
async def parallel_reviews(ctx: WorkflowContext, input: ReviewInput):
# Run three review agents in parallel
results = await ctx.step.batch_agent_invoke_and_wait(
"batch_invoke_grammar_tone_accuracy_reviews",
[
grammar_review_agent.with_input(input.text),
tone_consistency_review_agent.with_input(input.text),
accuracy_review_agent.with_input(input.text),
]
)
# Worker suspends until all agents complete
for result in results:
print(f"Review result: {result.result}")
return results
step.wait_for()
Pause execution for a duration:
@workflow
async def delayed_workflow(ctx: WorkflowContext, input: dict):
# Wait 1 hour (worker suspends, no compute cost)
await ctx.step.wait_for("wait_1_hour", hours=1)
# Resume after 1 hour
result = await ctx.step.run("process", process_data, input)
return result
Available units: seconds, minutes, hours, days, weeks
step.wait_until()
Wait until a specific datetime:
from datetime import datetime, timezone
@workflow
async def scheduled_action(ctx: WorkflowContext, input: dict):
# Schedule for specific time
target_time = datetime(2025, 12, 31, 23, 59, 0, tzinfo=timezone.utc)
await ctx.step.wait_until("wait_until_new_year", target_time)
# Executes at 23:59 on Dec 31, 2025
await ctx.step.run("celebrate", send_celebration)
step.wait_for_event()
Wait for an external event:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: dict):
# Submit for user confirmation
await ctx.step.run("submit", submit_for_user_confirmation, input)
# Wait for user.confirmation event (hours or days)
approval = await ctx.step.wait_for_event(
"user_confirmation",
topic="user.confirmation",
timeout=86400 # 24 hour timeout
)
# Resume when event arrives
if approval.data["approved"]:
await ctx.step.run("execute", execute_action, input)
step.suspend()
Suspend execution and wait for manual resume. This is syntactic sugar around wait_for_event that’s tailored for agent human-in-the-loop flows.
@workflow
async def manual_approval(ctx: WorkflowContext, input: dict):
# Prepare action
action = await ctx.step.run("prepare", prepare_action, input)
# Suspend with context data
resume_data = await ctx.step.suspend(
"approval",
data={
"action": action.description,
"cost": action.estimated_cost
},
timeout=3600 # 1 hour timeout
)
# Resume with approval decision
decision = resume_data.get("data", {})
if decision.get("approved"):
await ctx.step.run("execute", execute_action, action)
See Human-in-the-Loop for details.
step.publish_event()
Publish events durably:
@workflow
async def event_publisher(ctx: WorkflowContext, input: dict):
# Process data
result = await ctx.step.run("process", process_data, input)
# Publish event (guaranteed once)
await ctx.step.publish_event(
"notify_complete",
topic="processing.completed",
data={"result": result},
event_type="completion"
)
step.uuid()
Generate a UUID that persists across replays:
@workflow
async def create_entity(ctx: WorkflowContext, input: dict):
# Generate stable UUID (same on replay)
entity_id = await ctx.step.uuid("entity_id")
# Use the generated UUID in API calls
await ctx.step.run("create", create_in_api, entity_id, input)
return {"id": entity_id}
Why this matters:
# ❌ BAD: New UUID on each replay
entity_id = str(uuid.uuid4()) # Different every time
await ctx.step.run("create", create_entity, entity_id)
# ✅ GOOD: Same UUID on replay
entity_id = await ctx.step.uuid("entity_id") # Cached
await ctx.step.run("create", create_entity, entity_id)
step.now()
Get current timestamp that persists across replays:
@workflow
async def timestamped_workflow(ctx: WorkflowContext, input: dict):
# Get stable timestamp (same on replay)
created_at = await ctx.step.now("created_at")
# Use in conditional logic
if created_at % 2 == 0:
await ctx.step.run("even_path", process_even)
else:
await ctx.step.run("odd_path", process_odd)
step.trace()
Add custom spans for observability:
@workflow
async def monitored_workflow(ctx: WorkflowContext, input: dict):
# Custom span for database operations
with ctx.step.trace("database_query", {"table": "users", "limit": 100}):
users = await db.query("SELECT * FROM users LIMIT 100")
# Custom span for external API
with ctx.step.trace("external_api", {"endpoint": "/data"}):
data = await api.fetch("/data")
return {"users": len(users), "data": data}
See Tracing for details.
Step keys must be unique
Each step needs a unique step_key per execution:
@workflow
async def process_items(ctx: WorkflowContext, input: dict):
# ❌ BAD: Same key in loop
for item in input.items:
await ctx.step.run("process", process_item, item) # Collision!
# ✅ GOOD: Unique key per iteration
for i, item in enumerate(input.items):
await ctx.step.run(f"process_{i}", process_item, item)
Agents and steps
In Polos, agents are special workflows. Everything an agent does is broken into steps:
Agent actions as steps:
- ✅ LLM calls - Each call is a step
- ✅ Guardrail evaluation - Each guardrail is a step
- ✅ Stop condition checks - Each check is a step
- ✅ Tool calls - Tools are subworkflows (not steps)
This means:
- Agent failures resume from the last completed step (not the beginning)
- Tool calls are durable subworkflows (not lost on failure)
- Every agent action is observable via step events