Skip to main content
Human-in-the-loop (HITL) workflows pause execution to wait for human approval, input, or decisions before continuing. This is essential for sensitive operations like financial transactions, data deletions, or any action requiring human oversight.

Basic suspend and resume

Use ctx.step.suspend() to pause workflow execution and wait for external input:
from polos import workflow, WorkflowContext, Agent
from pydantic import BaseModel, Field
from typing import List, Optional

class ActionPlan(BaseModel):
    description: str = Field(description="What will be done")
    steps: List[str] = Field(description="Steps to execute")
    estimated_cost: float = Field(description="Estimated cost in USD")
    risk_level: str = Field(description="low, medium, or high")

planning_agent = Agent(
    id="planning-agent",
    provider="openai",
    model="gpt-4o",
    system_prompt="Analyze requests and create detailed action plans with cost estimates.",
    output_schema=ActionPlan
)

class ApprovalWorkflowInput(BaseModel):
    request: str

class ApprovalWorkflowOutput(BaseModel):
    status: str
    result: Optional[str] = None
    rejection_reason: Optional[str] = None


@workflow
async def approval_workflow(ctx: WorkflowContext, input: ApprovalWorkflowInput):
    # Step 1: Agent creates action plan
    plan_response = await ctx.step.agent_invoke_and_wait(
        "create_plan",
        planning_agent.with_input(f"Create an action plan for: {input.request}")
    )

    plan = plan_response.result  # ActionPlan (structured output)

    # Step 2: Suspend and wait for approval
    resume_data = await ctx.step.suspend(
        "wait_for_approval",
        data={
            "description": plan.description,
            "steps": plan.steps,
            "estimated_cost": plan.estimated_cost,
            "risk_level": plan.risk_level,
            "requires_approval_from": "manager@company.com"
        },
        timeout=3600  # 1 hour timeout
    )

    # Step 3: Resumes here when approval event is received. Execute if approved.
    decision = resume_data.get("data", {})
    if decision.get("approved"):
        result = await ctx.step.run(
            "execute_plan",
            execute_action,
            plan
        )
        return ApprovalWorkflowOutput(status="completed", result=result)
    else:
        return ApprovalWorkflowOutput(status="rejected", rejection_reason=decision.get("reason"))
What happens:
  1. Agent creates a structured action plan
  2. Workflow suspends with plan details
  3. Worker suspends execution (no compute consumed)
  4. An event is emitted with suspend details
  5. Workflow waits indefinitely (or until timeout)
  6. When resume event arrives, workflow continues with the provided data

Suspend events

When a workflow suspends, Polos emits an event to a topic specific to that suspension: Topic format: {step_key}/{execution_id} Event:
{
  "event_type": "suspend",
  "topic": "wait_for_approval/abc-123-def-456",
  "data": {
    "description": "Delete 1000 customer records from archive",
    "steps": ["Backup data", "Run deletion query", "Verify deletion"],
    "estimated_cost": 50.00,
    "risk_level": "high",
    "requires_approval_from": "manager@company.com"
  }
}

Listening for suspend events

Stream workflow events to detect when a workflow suspends:
import asyncio
from polos import PolosClient, events

async def main():
    client = PolosClient()

    # Start the workflow
    handle = await approval_workflow.invoke(client, ApprovalWorkflowInput(
        request="delete old customer records from archive"
    ))

    print(f"Workflow started: {handle.id}")
    print("Waiting for approval...")

    # Stream events from the workflow to detect suspension
    async for event in events.stream_workflow(client, handle.root_workflow_id, handle.id):
        if event.event_type.startswith("suspend_"):
            suspend_data = event.data
            print("Workflow suspended, awaiting approval:")
            print(f"  Description: {suspend_data['description']}")
            print(f"  Steps: {', '.join(suspend_data['steps'])}")
            print(f"  Cost: ${suspend_data['estimated_cost']}")
            print(f"  Risk: {suspend_data['risk_level']}")
            print(f"  Approver: {suspend_data['requires_approval_from']}")
            break

if __name__ == "__main__":
    asyncio.run(main())

Resuming workflows

To resume a suspended workflow, emit a resume event to the same topic:
from polos import PolosClient
from datetime import datetime

async def approve_workflow():
    client = PolosClient()

    # Resume with approval
    await client.resume(
        suspend_workflow_id="wf-abc-123",          # Root workflow ID
        suspend_execution_id="abc-123-def-456",    # Workflow execution ID
        suspend_step_key="wait_for_approval",      # Step key from suspend()
        data={
            "approved": True,
            "approved_by": "alice@company.com",
            "approved_at": datetime.now().isoformat(),
            "notes": "Approved after review"
        }
    )
The workflow resumes immediately and continues with the provided data.

Complete example

Here’s a full approval workflow with suspend/resume:
import asyncio
from polos import workflow, WorkflowContext, events, PolosClient
from datetime import datetime
from pydantic import BaseModel
from typing import Optional

class ChargeRequest(BaseModel):
    customer_id: str
    amount: float
    description: str

class ChargeResponse(BaseModel):
    status: str
    reason: Optional[str] = None  # Used for rejections
    charge_id: Optional[str] = None
    amount: Optional[float] = 0.0

@workflow
async def charge_customer_workflow(ctx: WorkflowContext, input: ChargeRequest):
    # Validate the charge
    validation = await ctx.step.run(
        "validate_charge",
        validate_charge_request,
        input
    )

    if not validation.is_valid:
        return ChargeResponse(status="invalid", reason=validation.error)

    # For charges over $1000, require approval
    if input.amount > 1000:
        resume_data = await ctx.step.suspend(
            "high_value_approval",
            data={
                "customer_id": input.customer_id,
                "amount": input.amount,
                "description": input.description,
                "threshold_exceeded": True
            },
            timeout=7200  # 2 hours
        )

        decision = resume_data.get("data", {})
        if not decision.get("approved"):
            return ChargeResponse(
                status="rejected",
                reason=decision.get("reason", "Not approved")
            )

    # Execute the charge
    charge_result = await ctx.step.run(
        "execute_charge",
        charge_stripe,
        input
    )

    # Send confirmation
    await ctx.step.run(
        "send_confirmation",
        send_email,
        customer_id=input.customer_id,
        charge_id=charge_result.charge_id
    )

    return ChargeResponse(
        status="completed",
        charge_id=charge_result.charge_id,
        amount=input.amount
    )

# Start workflow
async def main():
    client = PolosClient()

    handle = await charge_customer_workflow.invoke(
        client,
        ChargeRequest(
            customer_id="cust_123",
            amount=5000.00,
            description="Enterprise annual subscription"
        ))

    print(f"Charge workflow started: {handle.id}")

    # Stream events from the workflow to detect suspension
    async for event in events.stream_workflow(client, handle.root_workflow_id, handle.id):
        if event.event_type.startswith("suspend_"):
            suspend_data = event.data
            print("\nHigh-value charge requires approval:")
            print(f"   Customer: {suspend_data['customer_id']}")
            print(f"   Amount: ${suspend_data['amount']}")
            print(f"   Description: {suspend_data['description']}")

            # Send Slack notification with approval URL
            approval_url = f"https://admin.example.com/approve/{handle.id}"
            await send_slack_message(
                channel="#financial-approvals",
                message=f"High-value charge requires approval: ${suspend_data['amount']}",
                blocks=[{
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Charge Approval Required*\nAmount: ${suspend_data['amount']}\nCustomer: {suspend_data['customer_id']}\n<{approval_url}|Approve or Reject>"
                    }
                }]
            )
            break

    # Wait for workflow completion
    result = await handle.result()
    print(f"\nCharge completed: {result.charge_id}")

if __name__ == "__main__":
    asyncio.run(main())

Building approval UIs

The slack message has an approval URL of https://admin.example.com/approve/{execution_id}. When a human clicks the URL and makes a decision, your approval endpoint can call the Polos API to resume the workflow.
const response = await fetch(
  `https://api.polos.ai/api/v1/executions/${executionId}/resume`,
  {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      step_key: "high_value_approval",
      data: {
        approved: approval.approved,
        approved_by: approval.approved_by,
        approved_at: new Date().toISOString(),
        reason: approval.reason
      }
    })
  }
)

const result = await response.json()

Timeouts

Suspensions can have timeouts to prevent workflows from waiting indefinitely:
try:
    resume_data = await ctx.step.suspend(
        "approval",
        data={"request": "delete records"},
        timeout=3600  # 1 hour in seconds
    )
except StepExecutionError as e:
    return ChargeResponse(
        status="timeout",
        reason="Approval not received in time"
    )

Multiple approvals

Handle multi-stage approvals:
@workflow
async def multi_approval_workflow(ctx: WorkflowContext, input: MultiApprovalInput):
    # Stage 1: Manager approval
    manager_data = await ctx.step.suspend(
        "manager_approval",
        data={"stage": "manager", "amount": input.amount}
    )

    manager_decision = manager_data.get("data", {})
    if not manager_decision.get("approved"):
        return MultiApprovalOutput(status="rejected_by_manager")

    # Stage 2: Finance approval (for large amounts)
    if input.amount > 10000:
        finance_data = await ctx.step.suspend(
            "finance_approval",
            data={"stage": "finance", "amount": input.amount}
        )

        finance_decision = finance_data.get("data", {})
        if not finance_decision.get("approved"):
            return MultiApprovalOutput(status="rejected_by_finance")

    # Execute action
    result = await ctx.step.run("execute", execute_action, input)
    return MultiApprovalOutput(status="completed", result=result)

Key takeaways

  • ctx.step.suspend() pauses workflow execution and waits for external input
  • No compute consumed while suspended — workflows can wait hours or days
  • Detect suspension by streaming workflow events with events.stream_workflow()
  • Resume with client.resume() providing workflow ID, execution ID, step key, and decision data
  • Use timeouts to prevent indefinite waiting
  • Build approval UIs by listening to suspend events and calling resume API
  • Multi-stage approvals supported with multiple suspend steps