Scheduling Methods
The scheduler runs handlers at times defined by trigger objects. The convenience methods below cover the common cases so most apps never need to construct a trigger directly. Every method is async, requires await, and returns a ScheduledJob.
Which method should I use?
| Timing need | Method |
|---|---|
| Run once, N seconds from now | run_in |
| Repeat on a fixed interval | run_every (or run_minutely / run_hourly) |
| Run at the same time every day | run_daily |
| Run on a complex or calendar schedule | run_cron |
| Run once at a specific wall-clock time | run_once |
| Use a custom trigger | schedule |
Run once after a delay: run_in
The handler runs once after a fixed delay. The underlying After trigger fires once and does not repeat.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The handler to run. |
delay |
float |
(required) | Seconds to wait before running. |
Shared parameters apply (see Parameters every method accepts).
from hassette import App, AppConfig
class DelayApp(App[AppConfig]):
async def on_initialize(self):
# Run in 5 seconds
await self.scheduler.run_in(self.turn_off_light, delay=5.0, name="turn_off_light")
# Run in 10 minutes (using TimeDelta or seconds)
await self.scheduler.run_in(self.check_status, delay=600, name="check_status")
async def turn_off_light(self):
pass
async def check_status(self):
pass
Repeat on an interval: run_every
The handler runs repeatedly at a fixed interval. The hours, minutes, and seconds parameters are additive; at least one must be nonzero. Each next run is calculated from the previous run time, not from wall-clock time. The interval stays drift-resistant under load.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The handler to run. |
hours |
float |
0 |
Hours component of the interval. |
minutes |
float |
0 |
Minutes component of the interval. |
seconds |
float |
0 |
Seconds component of the interval. |
Shared parameters apply.
from hassette import App, AppConfig
class IntervalApp(App[AppConfig]):
async def on_initialize(self):
# Every 10 seconds
await self.scheduler.run_every(self.poll_api, seconds=10, name="poll_api")
# Every hour (using hours parameter)
await self.scheduler.run_every(self.hourly_check, hours=1, name="hourly_check")
async def poll_api(self):
pass
async def hourly_check(self):
pass
Shorthands: run_minutely and run_hourly
run_minutely and run_hourly are shorthands for run_every with a single integer interval parameter. Both enforce a minimum of 1.
| Method | Shorthand for | Interval parameter | Minimum |
|---|---|---|---|
run_minutely(func, minutes=1) |
run_every(minutes=N) |
minutes: int |
1 |
run_hourly(func, hours=1) |
run_every(hours=N) |
hours: int |
1 |
Shared parameters apply to both.
from hassette import App, AppConfig
class MinutelyApp(App[AppConfig]):
async def on_initialize(self):
# Every minute
await self.scheduler.run_minutely(self.task, name="task_minutely")
# Every 5 minutes
await self.scheduler.run_minutely(self.task, minutes=5, name="task_every_5m")
async def task(self):
pass
from hassette import App, AppConfig
class HourlyApp(App[AppConfig]):
async def on_initialize(self):
# Every hour
await self.scheduler.run_hourly(self.task, name="task_hourly")
# Every 4 hours
await self.scheduler.run_hourly(self.task, hours=4, name="task_every_4h")
async def task(self):
pass
Run at the same time every day: run_daily
The handler runs once per day at a fixed wall-clock time. A cron-based trigger ensures DST-correct, wall-clock-aligned scheduling. Interval-based daily scheduling drifts by one hour on DST transitions; run_daily does not.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The handler to run. |
at |
str |
"00:00" |
Wall-clock time in "HH:MM" format. |
Shared parameters apply.
from hassette import App, AppConfig
class DailyApp(App[AppConfig]):
async def on_initialize(self):
# Every day at midnight (default)
await self.scheduler.run_daily(self.task, name="task_daily")
# Every day at 7:00 AM (wall-clock, DST-safe)
await self.scheduler.run_daily(self.morning_routine, at="07:00", name="morning_routine")
async def task(self):
pass
async def morning_routine(self):
pass
Run on a cron schedule: run_cron
The handler runs on a schedule defined by a cron expression. Both 5-field (standard Unix cron) and 6-field expressions are accepted. An invalid expression raises ValueError at registration time.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The handler to run. |
expression |
str |
(required) | A 5- or 6-field cron expression. |
Shared parameters apply.
Cron field reference (5-field standard: minute hour dom month dow):
| Position | Field | Range | Example |
|---|---|---|---|
| 1 | minute | 0–59 | */15 (every 15 minutes) |
| 2 | hour | 0–23 | 9 (9 AM) |
| 3 | day of month | 1–31 | 1,15 (1st and 15th) |
| 4 | month | 1–12 | 6 (June) |
| 5 | day of week | 0–6 (Sunday=0) | 1-5 (weekdays) |
6-field expressions append seconds as a 6th field per the croniter library convention: minute hour dom month dow second.
from hassette import App, AppConfig
class CronApp(App[AppConfig]):
async def on_initialize(self):
# Weekdays at 9 AM (5-field standard cron: minute hour dom month dow)
await self.scheduler.run_cron(self.work_start, "0 9 * * 1-5")
# Every 15 minutes
await self.scheduler.run_cron(self.check, "*/15 * * * *")
# First of the month at midnight
await self.scheduler.run_cron(self.monthly_job, "0 0 1 * *")
async def work_start(self):
pass
async def check(self):
pass
async def monthly_job(self):
pass
Run once at a specific time: run_once
The handler runs once at a specific wall-clock time. The Once trigger fires once and does not repeat.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The handler to run. |
at |
str \| ZonedDateTime |
(required) | Target time. A "HH:MM" string is interpreted as today in the system timezone. A ZonedDateTime (from the whenever library — from whenever import ZonedDateTime) fires at the exact instant specified. |
if_past |
"tomorrow" | "error" |
"tomorrow" |
Behavior when the target is already in the past. "tomorrow" defers by one day and logs a WARNING. "error" raises ValueError for both input types. |
Shared parameters apply.
Past ZonedDateTime inputs fire immediately
When at is a ZonedDateTime in the past and if_past="tomorrow" (the default), the job fires at the next scheduler tick — there is no "tomorrow" for an absolute instant. if_past="error" still raises ValueError.
from hassette import App, AppConfig
class AlarmApp(App[AppConfig]):
async def on_initialize(self):
# Run once at 7:30 AM today (or tomorrow if already past)
await self.scheduler.run_once(self.morning_alarm, at="07:30", name="morning_alarm")
async def morning_alarm(self):
self.logger.info("Good morning!")
Use a custom trigger: schedule
schedule is the base method all convenience methods delegate to. Most apps never call it directly. schedule is the right choice when a built-in convenience method cannot express the required timing, such as a custom trigger that implements TriggerProtocol. See Triggers for the built-in trigger types and the protocol definition.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The handler to run. |
trigger |
TriggerProtocol |
(required) | A trigger object that determines first run time and recurrences. |
Shared parameters apply.
from hassette import App, AppConfig
from hassette.scheduler import Cron, Daily, Every
class ScheduleExampleApp(App[AppConfig]):
async def on_initialize(self) -> None:
# Fixed interval
job = await self.scheduler.schedule(self.check_sensors, Every(minutes=5)) # pyright: ignore[reportUnusedVariable]
# Daily at a specific time
job = await self.scheduler.schedule(self.morning_routine, Daily(at="07:00"), group="morning") # pyright: ignore[reportUnusedVariable]
# Cron expression
job = await self.scheduler.schedule(self.workday_task, Cron("0 9 * * 1-5")) # pyright: ignore[reportUnusedVariable]
async def check_sensors(self) -> None: ...
async def morning_routine(self) -> None: ...
async def workday_task(self) -> None: ...
Parameters every method accepts
These parameters are accepted by every scheduling method. Individual method tables list only method-specific parameters.
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
"" |
Identifies the job in logs and the monitoring UI. Auto-generated from the callable and trigger when empty. Must be unique within the app instance — see Idempotent Registration. |
group |
str \| None |
None |
Group name for bulk management. See Job Management for grouping. |
jitter |
float \| None |
None |
Random offset in seconds applied at enqueue time. See Job Management for jitter. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None inherits the global scheduler.job_timeout_seconds from hassette.toml. |
timeout_disabled |
bool |
False |
Disables timeout enforcement for this job, regardless of the global default. |
on_error |
SchedulerErrorHandlerType \| None |
None |
Per-job error handler. Overrides the app-level handler set via scheduler.on_error(). Invoked on any exception except CancelledError. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with the same name already exists. See Idempotent Registration. |
args |
tuple \| None |
None |
Positional arguments passed to the handler at call time. |
kwargs |
Mapping \| None |
None |
Keyword arguments passed to the handler at call time. |
Passing arguments to handlers
All scheduling methods accept args and kwargs to supply data to the handler at call time. This avoids capturing mutable state in closures.
from hassette import App, AppConfig
class NotifyApp(App[AppConfig]):
async def on_initialize(self):
# Pass positional arguments to the handler
await self.scheduler.run_in(
self.send_alert,
delay=30.0,
name="startup_alert",
args=("Kitchen motion sensor", "triggered"),
)
# Pass keyword arguments to the handler
await self.scheduler.run_every(
self.log_status,
seconds=300,
name="status_log",
kwargs={"level": "info", "include_history": True},
)
# Combine args and kwargs
await self.scheduler.run_daily(
self.generate_report,
at="06:00",
name="daily_report",
args=("daily",),
kwargs={"recipients": ["admin"]},
)
async def send_alert(self, sensor: str, state: str):
self.logger.info("Alert: %s is %s", sensor, state)
async def log_status(self, level: str = "debug", include_history: bool = False):
self.logger.info("Status logged (level=%s, history=%s)", level, include_history)
async def generate_report(self, period: str, recipients: list[str]):
self.logger.info("Generating %s report for %s", period, recipients)
Idempotent registration
Job names must be unique within an app instance. Registering a second job with an existing name raises ValueError by default. The if_exists parameter controls this behavior.
| Value | Behavior |
|---|---|
"error" (default) |
Raises ValueError when a job with the same name already exists. |
"skip" |
Returns the existing job when its configuration matches the new registration. Raises ValueError when names match but configurations differ. Two jobs match when they share the same callable, trigger (by trigger_id()), group, jitter, timeout, timeout_disabled, args, kwargs, and on_error handler. |
"replace" |
Cancels the existing job and registers the new one. The new job's configuration does not need to match the old one. |
if_exists matters most in on_initialize, which re-runs on app reload (triggered by config changes or hassette reload).
# Safe to call on every reload — won't create duplicates
await self.scheduler.run_every(
self.check_sensors,
seconds=60,
name="sensor_check",
if_exists="skip",
)
"skip" works when the job configuration is stable across reloads. "replace" is the right choice when the handler, trigger, or arguments may change between reloads.
# Use replace when the trigger or handler may change between reloads
await self.scheduler.run_every(
self.check_sensors,
seconds=120,
name="sensor_check",
if_exists="replace",
)
See Also
- Triggers: built-in trigger types,
TriggerProtocol, and writing custom triggers - Job Management: cancelling, inspecting, grouping, jitter, and error handling for scheduled jobs
- Scheduler Overview: getting started with the scheduler