Skip to content

Scheduling Methods

The scheduler runs handlers at times defined by trigger objects. The convenience methods below cover the common cases so most apps never need to construct a trigger directly. Every method is async, requires await, and returns a ScheduledJob.

Which method should I use?

Timing need Method
Run once, N seconds from now run_in
Repeat on a fixed interval run_every (or run_minutely / run_hourly)
Run at the same time every day run_daily
Run on a complex or calendar schedule run_cron
Run once at a specific wall-clock time run_once
Use a custom trigger schedule

Run once after a delay: run_in

The handler runs once after a fixed delay. The underlying After trigger fires once and does not repeat.

Parameter Type Default Description
func callable (required) The handler to run.
delay float (required) Seconds to wait before running.

Shared parameters apply (see Parameters every method accepts).

from hassette import App, AppConfig


class DelayApp(App[AppConfig]):
    async def on_initialize(self):
        # Run in 5 seconds
        await self.scheduler.run_in(self.turn_off_light, delay=5.0, name="turn_off_light")

        # Run in 10 minutes (using TimeDelta or seconds)
        await self.scheduler.run_in(self.check_status, delay=600, name="check_status")

    async def turn_off_light(self):
        pass

    async def check_status(self):
        pass

Repeat on an interval: run_every

The handler runs repeatedly at a fixed interval. The hours, minutes, and seconds parameters are additive; at least one must be nonzero. Each next run is calculated from the previous run time, not from wall-clock time. The interval stays drift-resistant under load.

Parameter Type Default Description
func callable (required) The handler to run.
hours float 0 Hours component of the interval.
minutes float 0 Minutes component of the interval.
seconds float 0 Seconds component of the interval.

Shared parameters apply.

from hassette import App, AppConfig


class IntervalApp(App[AppConfig]):
    async def on_initialize(self):
        # Every 10 seconds
        await self.scheduler.run_every(self.poll_api, seconds=10, name="poll_api")

        # Every hour (using hours parameter)
        await self.scheduler.run_every(self.hourly_check, hours=1, name="hourly_check")

    async def poll_api(self):
        pass

    async def hourly_check(self):
        pass

Shorthands: run_minutely and run_hourly

run_minutely and run_hourly are shorthands for run_every with a single integer interval parameter. Both enforce a minimum of 1.

Method Shorthand for Interval parameter Minimum
run_minutely(func, minutes=1) run_every(minutes=N) minutes: int 1
run_hourly(func, hours=1) run_every(hours=N) hours: int 1

Shared parameters apply to both.

from hassette import App, AppConfig


class MinutelyApp(App[AppConfig]):
    async def on_initialize(self):
        # Every minute
        await self.scheduler.run_minutely(self.task, name="task_minutely")

        # Every 5 minutes
        await self.scheduler.run_minutely(self.task, minutes=5, name="task_every_5m")

    async def task(self):
        pass
from hassette import App, AppConfig


class HourlyApp(App[AppConfig]):
    async def on_initialize(self):
        # Every hour
        await self.scheduler.run_hourly(self.task, name="task_hourly")

        # Every 4 hours
        await self.scheduler.run_hourly(self.task, hours=4, name="task_every_4h")

    async def task(self):
        pass

Run at the same time every day: run_daily

The handler runs once per day at a fixed wall-clock time. A cron-based trigger ensures DST-correct, wall-clock-aligned scheduling. Interval-based daily scheduling drifts by one hour on DST transitions; run_daily does not.

Parameter Type Default Description
func callable (required) The handler to run.
at str "00:00" Wall-clock time in "HH:MM" format.

Shared parameters apply.

from hassette import App, AppConfig


class DailyApp(App[AppConfig]):
    async def on_initialize(self):
        # Every day at midnight (default)
        await self.scheduler.run_daily(self.task, name="task_daily")

        # Every day at 7:00 AM (wall-clock, DST-safe)
        await self.scheduler.run_daily(self.morning_routine, at="07:00", name="morning_routine")

    async def task(self):
        pass

    async def morning_routine(self):
        pass

Run on a cron schedule: run_cron

The handler runs on a schedule defined by a cron expression. Both 5-field (standard Unix cron) and 6-field expressions are accepted. An invalid expression raises ValueError at registration time.

Parameter Type Default Description
func callable (required) The handler to run.
expression str (required) A 5- or 6-field cron expression.

Shared parameters apply.

Cron field reference (5-field standard: minute hour dom month dow):

Position Field Range Example
1 minute 0–59 */15 (every 15 minutes)
2 hour 0–23 9 (9 AM)
3 day of month 1–31 1,15 (1st and 15th)
4 month 1–12 6 (June)
5 day of week 0–6 (Sunday=0) 1-5 (weekdays)

6-field expressions append seconds as a 6th field per the croniter library convention: minute hour dom month dow second.

from hassette import App, AppConfig


class CronApp(App[AppConfig]):
    async def on_initialize(self):
        # Weekdays at 9 AM (5-field standard cron: minute hour dom month dow)
        await self.scheduler.run_cron(self.work_start, "0 9 * * 1-5")

        # Every 15 minutes
        await self.scheduler.run_cron(self.check, "*/15 * * * *")

        # First of the month at midnight
        await self.scheduler.run_cron(self.monthly_job, "0 0 1 * *")

    async def work_start(self):
        pass

    async def check(self):
        pass

    async def monthly_job(self):
        pass

Run once at a specific time: run_once

The handler runs once at a specific wall-clock time. The Once trigger fires once and does not repeat.

Parameter Type Default Description
func callable (required) The handler to run.
at str \| ZonedDateTime (required) Target time. A "HH:MM" string is interpreted as today in the system timezone. A ZonedDateTime (from the whenever library — from whenever import ZonedDateTime) fires at the exact instant specified.
if_past "tomorrow" | "error" "tomorrow" Behavior when the target is already in the past. "tomorrow" defers by one day and logs a WARNING. "error" raises ValueError for both input types.

Shared parameters apply.

Past ZonedDateTime inputs fire immediately

When at is a ZonedDateTime in the past and if_past="tomorrow" (the default), the job fires at the next scheduler tick — there is no "tomorrow" for an absolute instant. if_past="error" still raises ValueError.

from hassette import App, AppConfig


class AlarmApp(App[AppConfig]):
    async def on_initialize(self):
        # Run once at 7:30 AM today (or tomorrow if already past)
        await self.scheduler.run_once(self.morning_alarm, at="07:30", name="morning_alarm")

    async def morning_alarm(self):
        self.logger.info("Good morning!")

Use a custom trigger: schedule

schedule is the base method all convenience methods delegate to. Most apps never call it directly. schedule is the right choice when a built-in convenience method cannot express the required timing, such as a custom trigger that implements TriggerProtocol. See Triggers for the built-in trigger types and the protocol definition.

Parameter Type Default Description
func callable (required) The handler to run.
trigger TriggerProtocol (required) A trigger object that determines first run time and recurrences.

Shared parameters apply.

from hassette import App, AppConfig
from hassette.scheduler import Cron, Daily, Every


class ScheduleExampleApp(App[AppConfig]):
    async def on_initialize(self) -> None:
        # Fixed interval
        job = await self.scheduler.schedule(self.check_sensors, Every(minutes=5))  # pyright: ignore[reportUnusedVariable]

        # Daily at a specific time
        job = await self.scheduler.schedule(self.morning_routine, Daily(at="07:00"), group="morning")  # pyright: ignore[reportUnusedVariable]

        # Cron expression
        job = await self.scheduler.schedule(self.workday_task, Cron("0 9 * * 1-5"))  # pyright: ignore[reportUnusedVariable]

    async def check_sensors(self) -> None: ...
    async def morning_routine(self) -> None: ...
    async def workday_task(self) -> None: ...

Parameters every method accepts

These parameters are accepted by every scheduling method. Individual method tables list only method-specific parameters.

Parameter Type Default Description
name str "" Identifies the job in logs and the monitoring UI. Auto-generated from the callable and trigger when empty. Must be unique within the app instance — see Idempotent Registration.
group str \| None None Group name for bulk management. See Job Management for grouping.
jitter float \| None None Random offset in seconds applied at enqueue time. See Job Management for jitter.
timeout float \| None None Per-job timeout in seconds. None inherits the global scheduler.job_timeout_seconds from hassette.toml.
timeout_disabled bool False Disables timeout enforcement for this job, regardless of the global default.
on_error SchedulerErrorHandlerType \| None None Per-job error handler. Overrides the app-level handler set via scheduler.on_error(). Invoked on any exception except CancelledError.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with the same name already exists. See Idempotent Registration.
args tuple \| None None Positional arguments passed to the handler at call time.
kwargs Mapping \| None None Keyword arguments passed to the handler at call time.

Passing arguments to handlers

All scheduling methods accept args and kwargs to supply data to the handler at call time. This avoids capturing mutable state in closures.

from hassette import App, AppConfig


class NotifyApp(App[AppConfig]):
    async def on_initialize(self):
        # Pass positional arguments to the handler
        await self.scheduler.run_in(
            self.send_alert,
            delay=30.0,
            name="startup_alert",
            args=("Kitchen motion sensor", "triggered"),
        )

        # Pass keyword arguments to the handler
        await self.scheduler.run_every(
            self.log_status,
            seconds=300,
            name="status_log",
            kwargs={"level": "info", "include_history": True},
        )

        # Combine args and kwargs
        await self.scheduler.run_daily(
            self.generate_report,
            at="06:00",
            name="daily_report",
            args=("daily",),
            kwargs={"recipients": ["admin"]},
        )

    async def send_alert(self, sensor: str, state: str):
        self.logger.info("Alert: %s is %s", sensor, state)

    async def log_status(self, level: str = "debug", include_history: bool = False):
        self.logger.info("Status logged (level=%s, history=%s)", level, include_history)

    async def generate_report(self, period: str, recipients: list[str]):
        self.logger.info("Generating %s report for %s", period, recipients)

Idempotent registration

Job names must be unique within an app instance. Registering a second job with an existing name raises ValueError by default. The if_exists parameter controls this behavior.

Value Behavior
"error" (default) Raises ValueError when a job with the same name already exists.
"skip" Returns the existing job when its configuration matches the new registration. Raises ValueError when names match but configurations differ. Two jobs match when they share the same callable, trigger (by trigger_id()), group, jitter, timeout, timeout_disabled, args, kwargs, and on_error handler.
"replace" Cancels the existing job and registers the new one. The new job's configuration does not need to match the old one.

if_exists matters most in on_initialize, which re-runs on app reload (triggered by config changes or hassette reload).

# Safe to call on every reload — won't create duplicates
await self.scheduler.run_every(
    self.check_sensors,
    seconds=60,
    name="sensor_check",
    if_exists="skip",
)

"skip" works when the job configuration is stable across reloads. "replace" is the right choice when the handler, trigger, or arguments may change between reloads.

# Use replace when the trigger or handler may change between reloads
await self.scheduler.run_every(
    self.check_sensors,
    seconds=120,
    name="sensor_check",
    if_exists="replace",
)

See Also

  • Triggers: built-in trigger types, TriggerProtocol, and writing custom triggers
  • Job Management: cancelling, inspecting, grouping, jitter, and error handling for scheduled jobs
  • Scheduler Overview: getting started with the scheduler