Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Queues control how many workflow instances can run simultaneously. Use queues to rate-limit execution, manage resource contention, and prevent overload.
Why use queues?
Queues solve common concurrency challenges:
Rate limiting:
- Prevent overwhelming external APIs (OpenAI, Stripe, etc.)
- Control database connection usage
- Manage resource consumption
Ordered execution:
- Process per-user tasks sequentially
- Maintain ordering for critical operations
- Prevent race conditions
Resource protection:
- Limit concurrent access to shared resources
- Prevent thundering herd problems
- Control system load
Default queue behavior
Every workflow automatically gets its own queue:
from polos import workflow, WorkflowContext
@workflow
async def process_data(ctx: WorkflowContext, input: dict):
# Automatically assigned to queue named "process-data"
# Default concurrency limit from environment
result = await ctx.step.run("process", process, input)
return result
Default queue properties:
- Queue name: Same as workflow ID
- Concurrency limit: From environment variable (
POLOS_DEFAULT_CONCURRENCY_LIMIT)
- Behavior: Unlimited executions queue up, but only N run simultaneously
Custom queue limits
Set a specific concurrency limit for a workflow
@workflow(queue={"concurrency_limit": 3})
async def rate_limited_workflow(ctx: WorkflowContext, input: dict):
# Only 3 instances run at once
# Additional invocations queue until a slot opens
await ctx.step.run("call_api", call_external_api, input)
return {"status": "completed"}
Shared queues
Multiple workflows can share the same queue
from polos import queue
# Create a shared queue
openai_queue = queue("openai-api", concurrency_limit=5)
@workflow(queue=openai_queue)
async def gpt_summarizer(ctx: WorkflowContext, input: SummarizerInput):
summary = await ctx.step.run("summarize", call_openai_gpt, input.text)
return summary
@workflow(queue=openai_queue)
async def gpt_analyzer(ctx: WorkflowContext, input: AnalyzerInput):
analysis = await ctx.step.run("analyze", call_openai_gpt, input.data)
return analysis
# Both workflows share the "openai-api" queue
# Combined max 5 concurrent executions across both workflows
Use cases for shared queues:
- Rate-limit API calls across multiple workflows
- Control database connection pooling
- Manage resource contention
Queue configuration options
1. Inline configuration
@workflow(queue={"concurrency_limit": 10})
async def inline_config(ctx: WorkflowContext, input: dict):
# Queue name = "inline-config" (workflow ID)
# Concurrency limit = 10
pass
2. Queue name only
@workflow(queue="processing-queue")
async def named_queue(ctx: WorkflowContext, input: dict):
# Queue name = "processing-queue"
# Concurrency limit = default from environment
pass
3. Queue object
from polos import queue
priority_queue = queue("priority", concurrency_limit=3)
@workflow(queue=priority_queue)
async def priority_workflow(ctx: WorkflowContext, input: dict):
# Queue name = "priority"
# Concurrency limit = 3
pass
Per-entity queuing (Concurrency keys)
Use concurrency keys to create separate queues per user, tenant, or entity:
@workflow(queue="user-tasks")
async def user_task(ctx: WorkflowContext, input: dict):
# Process user-specific task
await ctx.step.run("process", process_user_task, input)
return {"status": "completed"}
# Invoke with concurrency_key
client = PolosClient()
await user_task.invoke(
client,
payload={"task": "data"},
concurrency_key=f"user:{user_id}" # Separate queue per user
)
How it works:
- Each unique
concurrency_key gets its own virtual queue
- Concurrency limit applies per key (not globally)
- Perfect for multi-tenant systems
Example: Per-user rate limiting
# User A can have 5 concurrent tasks
await user_task.invoke(client, payload={...}, concurrency_key="user:alice")
await user_task.invoke(client, payload={...}, concurrency_key="user:alice")
await user_task.invoke(client, payload={...}, concurrency_key="user:alice")
# User B also gets 5 concurrent slots (independent from User A)
await user_task.invoke(client, payload={...}, concurrency_key="user:bob")
await user_task.invoke(client, payload={...}, concurrency_key="user:bob")
Serial execution
Force workflows to run one at a time:
@workflow(queue={"concurrency_limit": 1})
async def serial_workflow(ctx: WorkflowContext, input: dict):
# Only 1 instance runs at a time
# Perfect for operations that must not overlap
await ctx.step.run("critical_section", critical_operation, input)
return {"status": "completed"}
Use cases:
- Database migrations
- File system operations
- Operations with global side effects
Queue behavior
Queuing and execution
@workflow(queue={"concurrency_limit": 2})
async def limited_workflow(ctx: WorkflowContext, input: dict):
await ctx.step.run("work", do_work, input)
return {"done": True}
# Invoke 5 times
client = PolosClient()
for i in range(5):
await limited_workflow.invoke(client, {"id": i})
# Execution timeline:
# Time 0: Workflow 1 starts, Workflow 2 starts (limit = 2)
# Time 0: Workflow 3, 4, 5 queued
# Time 5: Workflow 1 completes → Workflow 3 starts
# Time 7: Workflow 2 completes → Workflow 4 starts
# Time 10: Workflow 3 completes → Workflow 5 starts
Key points:
- Queued workflows wait until a slot opens
- FIFO order (first in, first out)
- No compute consumed while queued
Dynamic queue assignment
Override queue at invocation time:
@workflow(queue="default-queue")
async def flexible_workflow(ctx: WorkflowContext, input: dict):
await ctx.step.run("process", process_data, input)
return {"status": "completed"}
# Use default queue
client = PolosClient()
await flexible_workflow.invoke(client, {"data": "..."})
# Override with different queue
await flexible_workflow.invoke(
client,
{"data": "..."},
queue="priority-queue"
)
Queue restriction
Scheduled workflows cannot specify queues.
Key takeaways
- Default queue per workflow - Named after workflow ID
- Concurrency limits control max simultaneous executions
- Shared queues rate-limit across multiple workflows
- Concurrency keys create per-entity virtual queues
- Scheduled workflows cannot customize queues