Skip to main content
Workflows that run automatically when events are published.

Trigger on event

from polos import workflow, WorkflowContext, EventPayload

@workflow(id="on_order_created", trigger_on_event="orders/created")
async def on_order_created(ctx: WorkflowContext, payload: EventPayload):
    order = payload.data
    await ctx.step.run("validate", validate_order, order)
    await ctx.step.run("reserve", reserve_inventory, order)
    await ctx.step.run("notify", send_confirmation, order)
    return {"order_id": order["order_id"], "status": "processed"}

Batch events

from polos import BatchEventPayload

@workflow(
    id="batch_processor",
    trigger_on_event="data/updates",
    batch_size=10,
    batch_timeout_seconds=30,
)
async def batch_processor(ctx: WorkflowContext, payload: BatchEventPayload):
    # Receives up to 10 events at once, or triggers after 30s
    for event in payload.events:
        await ctx.step.run(f"process_{event.sequence_id}", process, event.data)
    return {"processed": len(payload.events)}

Publish events

@workflow(id="order_pipeline")
async def order_pipeline(ctx: WorkflowContext, payload):
    # Publish event to trigger other workflows
    await ctx.step.publish_event(
        "publish_order",
        topic="orders/created",
        data={"order_id": "123", "items": [...]},
    )
    return {"published": True}

Wait for events

@workflow(id="request_response")
async def request_response(ctx: WorkflowContext, payload):
    request_id = await ctx.step.uuid("request_id")

    await ctx.step.publish_event("request", topic=f"requests/{request_id}", data={...})

    # Wait for response event
    response = await ctx.step.wait_for_event(
        "wait_response",
        topic=f"responses/{request_id}",
        timeout=300,
    )
    return {"response": response.data}

Run it

git clone https://github.com/polos-dev/polos.git
cd polos/python-examples/15-event-triggered
cp .env.example .env
uv sync
python worker.py      # Terminal 1
python main.py        # Terminal 2
Open http://localhost:5173 to view your agents and workflows, run them from the UI, and see execution traces. Python example on GitHub | TypeScript example on GitHub