Trigger on event
Copy
from polos import workflow, WorkflowContext, EventPayload
@workflow(id="on_order_created", trigger_on_event="orders/created")
async def on_order_created(ctx: WorkflowContext, payload: EventPayload):
order = payload.data
await ctx.step.run("validate", validate_order, order)
await ctx.step.run("reserve", reserve_inventory, order)
await ctx.step.run("notify", send_confirmation, order)
return {"order_id": order["order_id"], "status": "processed"}
Batch events
Copy
from polos import BatchEventPayload
@workflow(
id="batch_processor",
trigger_on_event="data/updates",
batch_size=10,
batch_timeout_seconds=30,
)
async def batch_processor(ctx: WorkflowContext, payload: BatchEventPayload):
# Receives up to 10 events at once, or triggers after 30s
for event in payload.events:
await ctx.step.run(f"process_{event.sequence_id}", process, event.data)
return {"processed": len(payload.events)}
Publish events
Copy
@workflow(id="order_pipeline")
async def order_pipeline(ctx: WorkflowContext, payload):
# Publish event to trigger other workflows
await ctx.step.publish_event(
"publish_order",
topic="orders/created",
data={"order_id": "123", "items": [...]},
)
return {"published": True}
Wait for events
Copy
@workflow(id="request_response")
async def request_response(ctx: WorkflowContext, payload):
request_id = await ctx.step.uuid("request_id")
await ctx.step.publish_event("request", topic=f"requests/{request_id}", data={...})
# Wait for response event
response = await ctx.step.wait_for_event(
"wait_response",
topic=f"responses/{request_id}",
timeout=300,
)
return {"response": response.data}
Run it
Copy
git clone https://github.com/polos-dev/polos.git
cd polos/python-examples/15-event-triggered
cp .env.example .env
uv sync
python worker.py # Terminal 1
python main.py # Terminal 2