Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Scheduled workflows run automatically at specified times using cron expressions. Perfect for recurring tasks like daily reports, data syncs, or cleanup jobs.
Defining scheduled workflows
There are two ways to schedule workflows:
1. Inline cron string
Use a cron expression directly in the decorator:
from polos import workflow, WorkflowContext
@workflow(
id="daily-report",
schedule="0 9 * * *" # Every day at 9:00 AM UTC
)
async def daily_report(ctx: WorkflowContext, payload: SchedulePayload):
print(f"Report generated at: {payload.timestamp}")
# Generate and send report
data = await ctx.step.run("fetch_data", fetch_daily_data)
report = await ctx.step.run("generate", create_report, data)
await ctx.step.run("send", email_report, report)
return {"status": "completed"}
You can also specify timezone in the cron schedule:
@workflow(
id="end-of-day-sync",
schedule={
"cron": "0 17 * * *", # 5:00 PM
"timezone": "America/New_York"
}
)
async def end_of_day_sync(ctx: WorkflowContext, payload: SchedulePayload):
print(f"Syncing at {payload.timestamp} Eastern Time")
await ctx.step.run("sync_data", sync_to_warehouse)
return {"status": "synced"}
2. Create schedule via API
Mark a workflow as schedulable, then create one or more schedules for it via API:
This is useful for creating separate schedules per user or entity.
# Define workflow as schedulable
@workflow(
id="user-reminder",
schedule=True # Can be scheduled via API
)
async def user_reminder(ctx: WorkflowContext, payload: SchedulePayload):
user_id = payload.key # Schedule key (e.g., user ID)
print(f"Reminder for user {user_id} at {payload.timestamp}")
await ctx.step.run("send_reminder", send_user_reminder, user_id)
return {"status": "sent"}
Create schedule via API:
import asyncio
from polos import schedules, PolosClient
async def schedule_user_reminders(client):
# Schedule per-user reminder
schedule_id = await schedules.create(
client,
workflow="user-reminder",
cron="0 8 * * *", # 8:00 AM daily
timezone="America/Los_Angeles",
key="user_123" # Per-user schedule
)
print(f"Created schedule: {schedule_id}")
client = PolosClient()
asyncio.run(schedule_user_reminders(client))
Schedule payload
Scheduled workflows receive a special SchedulePayload with timing information:
from datetime import datetime
from pydantic import BaseModel
class SchedulePayload(BaseModel):
timestamp: datetime # When this workflow was scheduled to run
last_timestamp: datetime | None # When this schedule last ran (None if first run)
timezone: str # Timezone of the schedule
schedule_id: str # Unique identifier for this schedule
key: str # User ID or custom identifier
upcoming: datetime # Next scheduled run time
@workflow(id="scheduled-task", schedule="0 */6 * * *")
async def scheduled_task(ctx: WorkflowContext, payload: SchedulePayload):
# Access schedule information
print(f"Current run: {payload.timestamp}")
print(f"Last run: {payload.last_timestamp}")
print(f"Next run: {payload.upcoming}")
print(f"Schedule ID: {payload.schedule_id}")
return {"executed_at": payload.timestamp}
Cron expressions
Cron format: minute hour day month day_of_week
Common patterns:
# Every hour
schedule="0 * * * *"
# Every day at 9 AM
schedule="0 9 * * *"
# Every Monday at 8 AM
schedule="0 8 * * 1"
# Every 15 minutes
schedule="*/15 * * * *"
# First day of every month at midnight
schedule="0 0 1 * *"
# Weekdays at 5 PM
schedule="0 17 * * 1-5"
# Every 6 hours
schedule="0 */6 * * *"
Cron syntax:
* - Any value
*/n - Every n units
n-m - Range from n to m
n,m - List of values
Field ranges:
- Minute: 0-59
- Hour: 0-23
- Day of month: 1-31
- Month: 1-12
- Day of week: 0-6 (Sunday = 0)
Use appropriate cron intervals
# ✅ GOOD: Reasonable intervals
schedule="0 */6 * * *" # Every 6 hours
schedule="0 9 * * *" # Daily at 9 AM
# ❌ BAD: Too frequent (use events or queues instead)
schedule="* * * * *" # Every minute
Per-user schedules
Use the key parameter to create separate schedules for each user or entity:
from polos import schedules, PolosClient
async def setup_user_schedule(client, user_id: str, preferences: dict):
# Create/update per-user schedule
schedule_id = await schedules.create(
client,
workflow="daily-digest",
cron="0 8 * * *", # 8 AM
timezone=preferences["timezone"],
key=user_id # Unique per user
)
return schedule_id
client = PolosClient()
# Each user gets their own schedule
await setup_user_schedule(client, "user_123", {"timezone": "America/New_York"})
await setup_user_schedule(client, "user_456", {"timezone": "Europe/London"})
In the workflow:
@workflow(id="daily-digest", schedule=True)
async def daily_digest(ctx: WorkflowContext, payload: SchedulePayload):
user_id = payload.key # Get user from schedule key
# Fetch user-specific data
user = await ctx.step.run("get_user", get_user, user_id)
digest = await ctx.step.run("generate", create_digest, user)
await ctx.step.run("send", send_email, user.email, digest)
return {"user_id": user_id, "sent": True}
Key behavior:
- If a schedule with the same workflow and key exists, it’s updated (not duplicated)
- Use
key="global" (default) for system-wide schedules
- Use
key=user_id for per-user schedules
Timezones
Always specify timezones for scheduled workflows to avoid confusion:
@workflow(
id="morning-report",
schedule={
"cron": "0 9 * * *",
"timezone": "America/New_York" # Runs at 9 AM Eastern
}
)
async def morning_report(ctx: WorkflowContext, payload: SchedulePayload):
# Runs at 9 AM Eastern time, adjusts for DST automatically
return {"executed_at": payload.timestamp}
Common timezones:
"UTC" - Coordinated Universal Time
"America/New_York" - US Eastern
"America/Los_Angeles" - US Pacific
"Europe/London" - UK
"Asia/Tokyo" - Japan
"Australia/Sydney" - Australia
Managing schedules via API
Create or update a schedule
from polos import schedules, PolosClient
client = PolosClient()
# Global schedule (one per workflow)
schedule_id = await schedules.create(
client,
workflow="cleanup-task",
cron="0 3 * * *", # 3 AM daily
timezone="UTC",
key="global"
)
# Per-user schedule
schedule_id = await schedules.create(
client,
workflow="user-notification",
cron="0 12 * * *", # Noon daily
timezone="America/Chicago",
key="user_789"
)
Update existing schedule:
# Calling create() with the same workflow and key updates it
schedule_id = await schedules.create(
client,
workflow="user-notification",
cron="0 15 * * *", # Change to 3 PM
timezone="America/Chicago",
key="user_789" # Same key = update
)