Skip to main content
Run multiple workflows concurrently and aggregate results.

Parallel execution with batch_invoke_and_wait

from polos import workflow, WorkflowContext
from polos.types.types import BatchWorkflowInput

@workflow(id="parallel_review")
async def parallel_review(ctx: WorkflowContext, payload):
    reviewers = ["alice", "bob", "charlie"]

    # Create batch of review requests
    requests = [
        BatchWorkflowInput(
            id="single_review",
            payload={"reviewer": r, "document_id": payload["doc_id"]},
        )
        for r in reviewers
    ]

    # Run all in parallel
    results = await ctx.step.batch_invoke_and_wait("reviews", requests)

    # Aggregate
    approved = sum(1 for r in results if r.result.get("approved"))
    return {"approved": approved, "total": len(results)}

Fire-and-forget with batch_invoke

@workflow(id="launch_background_tasks")
async def launch_tasks(ctx: WorkflowContext, payload):
    requests = [
        BatchWorkflowInput(id="background_task", payload={"task_id": i})
        for i in range(10)
    ]

    # Launch without waiting
    handles = await ctx.step.batch_invoke("launch", requests)

    return {"launched": len(handles), "ids": [h.id for h in handles]}

Fan-out/fan-in pattern

@workflow(id="process_chunks")
async def process_chunks(ctx: WorkflowContext, payload):
    data = payload["data"]
    chunk_size = 10

    # Split into chunks
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

    # Process in parallel
    requests = [
        BatchWorkflowInput(id="process_chunk", payload={"chunk": c, "index": i})
        for i, c in enumerate(chunks)
    ]
    results = await ctx.step.batch_invoke_and_wait("chunks", requests)

    # Combine results
    all_processed = []
    for r in results:
        all_processed.extend(r.result.get("processed", []))
    return {"total": len(all_processed)}

Run it

git clone https://github.com/polos-dev/polos.git
cd polos/python-examples/13-parallel-review
cp .env.example .env
uv sync
python worker.py      # Terminal 1
python main.py        # Terminal 2
Open http://localhost:5173 to view your agents and workflows, run them from the UI, and see execution traces. Python example on GitHub | TypeScript example on GitHub