Skip to content

Index

Task scheduling functionality for Home Assistant automations.

This module provides clean access to the scheduler system for running jobs at specific times, intervals, or based on cron expressions.

TriggerProtocol

Bases: Protocol

Protocol for defining triggers.

Six methods make up the contract: - first_run_time: returns the first scheduled run time given the current time - next_run_time: returns the next run time after a previous run, or None for one-shot triggers - trigger_label: short stable label for telemetry / UI display - trigger_detail: optional human-readable detail string - trigger_db_type: canonical type string for database storage - trigger_id: stable string identifier used for deduplication

Source code in src/hassette/types/types.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@runtime_checkable
class TriggerProtocol(Protocol):
    """Protocol for defining triggers.

    Six methods make up the contract:
    - first_run_time: returns the first scheduled run time given the current time
    - next_run_time: returns the next run time after a previous run, or None for one-shot triggers
    - trigger_label: short stable label for telemetry / UI display
    - trigger_detail: optional human-readable detail string
    - trigger_db_type: canonical type string for database storage
    - trigger_id: stable string identifier used for deduplication
    """

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the first scheduled run time at or after current_time."""
        ...

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime | None:
        """Return the next run time after previous_run, or None for one-shot triggers."""
        ...

    def trigger_label(self) -> str:
        """Human-readable display label for the UI.

        Custom triggers (those returning ``"custom"`` from ``trigger_db_type()``)
        MUST NOT return one of the built-in reserved names (``"after"``,
        ``"once"``, ``"every"``, ``"daily"``, ``"cron"``) from this method.
        Doing so creates misleading telemetry and UI rows where the
        ``trigger_type`` column is ``"custom"`` but the label implies a
        built-in trigger kind.
        """
        ...

    def trigger_detail(self) -> str | None:
        """Optional human-readable detail string."""
        ...

    def trigger_db_type(self) -> Literal["interval", "cron", "once", "after", "custom"]:
        """Canonical type string for database storage."""
        ...

    def trigger_id(self) -> str:
        """Stable string identifier used for deduplication."""
        ...

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return the first scheduled run time at or after current_time.

Source code in src/hassette/types/types.py
158
159
160
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the first scheduled run time at or after current_time."""
    ...

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime | None

Return the next run time after previous_run, or None for one-shot triggers.

Source code in src/hassette/types/types.py
162
163
164
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime | None:
    """Return the next run time after previous_run, or None for one-shot triggers."""
    ...

trigger_label() -> str

Human-readable display label for the UI.

Custom triggers (those returning "custom" from trigger_db_type()) MUST NOT return one of the built-in reserved names ("after", "once", "every", "daily", "cron") from this method. Doing so creates misleading telemetry and UI rows where the trigger_type column is "custom" but the label implies a built-in trigger kind.

Source code in src/hassette/types/types.py
166
167
168
169
170
171
172
173
174
175
176
def trigger_label(self) -> str:
    """Human-readable display label for the UI.

    Custom triggers (those returning ``"custom"`` from ``trigger_db_type()``)
    MUST NOT return one of the built-in reserved names (``"after"``,
    ``"once"``, ``"every"``, ``"daily"``, ``"cron"``) from this method.
    Doing so creates misleading telemetry and UI rows where the
    ``trigger_type`` column is ``"custom"`` but the label implies a
    built-in trigger kind.
    """
    ...

trigger_detail() -> str | None

Optional human-readable detail string.

Source code in src/hassette/types/types.py
178
179
180
def trigger_detail(self) -> str | None:
    """Optional human-readable detail string."""
    ...

trigger_db_type() -> Literal['interval', 'cron', 'once', 'after', 'custom']

Canonical type string for database storage.

Source code in src/hassette/types/types.py
182
183
184
def trigger_db_type(self) -> Literal["interval", "cron", "once", "after", "custom"]:
    """Canonical type string for database storage."""
    ...

trigger_id() -> str

Stable string identifier used for deduplication.

Source code in src/hassette/types/types.py
186
187
188
def trigger_id(self) -> str:
    """Stable string identifier used for deduplication."""
    ...

ScheduledJob dataclass

A job scheduled to run based on a trigger or at a specific time.

Source code in src/hassette/scheduler/classes.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
@dataclass(order=True)
class ScheduledJob:
    """A job scheduled to run based on a trigger or at a specific time."""

    sort_index: tuple[int, int] = field(init=False, repr=False)
    """Tuple of (next_run timestamp with nanoseconds, object id) for ordering in a priority queue."""

    owner_id: str = field(compare=False)
    """Unique string identifier for the owner of the job, e.g., a component or integration name."""

    next_run: ZonedDateTime = field(compare=False)
    """Unjittered logical fire time — used as `previous_run` in subsequent trigger calls."""

    fire_at: ZonedDateTime = field(init=False, compare=False)
    """Actual dispatch time, including any jitter offset.

    Equals ``next_run`` when no jitter is configured. Set by
    ``SchedulerService._apply_jitter_to_heap()`` at enqueue time when jitter > 0.
    The pop loop in ``_ScheduledJobQueue.pop_due_and_peek_next`` compares against
    ``fire_at`` (not ``next_run``) to decide when to dispatch.
    """

    job: "JobCallable" = field(compare=False)
    """The callable to execute when the job runs."""

    app_key: str = field(default="", compare=False)
    """Configuration-level app key for DB registration (e.g., 'my_app'). Empty for non-App owners."""

    instance_index: int = field(default=0, compare=False)
    """App instance index for DB registration. 0 for non-App owners."""

    trigger: "TriggerProtocol | None" = field(compare=False, default=None)
    """The trigger that determines the job's schedule."""

    group: str | None = field(default=None, compare=False)
    """Optional group name for grouping related jobs. Included in deduplication comparison."""

    jitter: float | None = field(default=None, compare=False)
    """Seconds of random offset applied at enqueue time by ``SchedulerService._apply_jitter_to_heap()``.

    Does not affect ``next_run`` (unjittered logical fire time). See the ``fire_at`` field on
    ``ScheduledJob`` for the actual dispatch time after jitter is applied.
    """

    timeout: float | None = field(default=None, compare=False)
    """Per-job timeout in seconds. ``None`` means use the global default
    (``config.scheduler_job_timeout_seconds``). A positive ``float`` overrides the default.
    Validated at construction: must be positive when set."""

    timeout_disabled: bool = field(default=False, compare=False)
    """When ``True``, timeout enforcement is disabled for this job regardless of the global default."""

    name: str = field(default="", compare=False)
    """Optional name for the job for easier identification."""

    name_auto: bool = field(default=False, compare=False)
    """Whether the name was auto-generated from the callable and trigger ID."""

    args: tuple[Any, ...] = field(default_factory=tuple, compare=False)
    """Positional arguments to pass to the job callable."""

    kwargs: dict[str, Any] = field(default_factory=dict, compare=False)
    """Keyword arguments to pass to the job callable."""

    error_handler: "SchedulerErrorHandlerType | None" = field(default=None, compare=False)
    """Optional error handler for this job.

    When set, this handler is invoked if the job raises an exception (including
    ``TimeoutError``, but excluding ``CancelledError``). Stored as-is for identity comparison
    in ``matches()``. ``compare=False`` prevents ``Callable | None`` from corrupting
    the ``@dataclass(order=True)`` heap ordering.
    """

    db_id: int | None = field(default=None, compare=False)
    """Database row ID for this job. Set by the executor after persistence; None until then."""

    source_location: str = field(default="", compare=False)
    """Captured source location (file:line) of the user code that scheduled this job."""

    registration_source: str = field(default="", compare=False)
    """Captured source code snippet of the scheduling call."""

    source_tier: SourceTier = field(default="app", compare=False)
    """Whether this job originates from a user app or the framework itself."""

    _scheduler: "Scheduler | None" = field(default=None, repr=False, compare=False)
    """Back-reference to the Scheduler that owns this job. Set by Scheduler.add_job()."""

    app_error_handler_resolver: "Callable[[], SchedulerErrorHandlerType | None] | None" = field(
        default=None, init=False, repr=False
    )
    """Closure that resolves the app-level error handler at dispatch time."""

    _dequeued: bool = field(default=False, repr=False, compare=False)
    """True after the job has been synchronously removed from the heap via dequeue_job()."""

    def __hash__(self) -> int:
        # Hashing on object identity is safe: each ScheduledJob is a unique object,
        # and sort_index includes id(self) as the tiebreaker, so the hash contract
        # (a == b implies hash(a) == hash(b)) holds. @dataclass(order=True) generates
        # __eq__ based on all compare=True fields (sort_index only), so two distinct
        # objects with the same sort_index are unequal and can safely share a hash bucket.
        return id(self)

    def __repr__(self) -> str:
        return f"ScheduledJob(name={self.name!r}, owner_id={self.owner_id})"

    def __post_init__(self) -> None:
        if self.timeout is not None and (isinstance(self.timeout, bool) or self.timeout <= 0):
            raise ValueError("timeout must be a positive number")
        if self.timeout_disabled and self.timeout is not None:
            raise ValueError("Cannot specify both 'timeout' and 'timeout_disabled=True'")

        self.set_next_run(self.next_run)

        if not self.name:
            callable_name = self.job.__name__ if hasattr(self.job, "__name__") else str(self.job)
            trigger_str = self.trigger.trigger_id() if self.trigger is not None else None
            self.name = f"{callable_name}:{trigger_str}" if self.trigger else callable_name
            self.name_auto = True

        self.args = tuple(self.args)
        self.kwargs = dict(self.kwargs)

    def mark_registered(self, db_id: int) -> None:
        """Set the database ID. Called by SchedulerService.add_job() after persistence.

        First call wins — a second call is a no-op, so a retry or double-registration
        cannot overwrite the original id. Mirrors ``Listener.mark_registered``.
        """
        if self.db_id is None:
            self.db_id = db_id

    def matches(self, other: "ScheduledJob") -> bool:
        """Check whether two jobs represent the same logical configuration.

        Compares callable, trigger (by trigger_id()), group, jitter, timeout,
        timeout_disabled, args, kwargs, and error_handler (by identity).
        Does not compare runtime state (db_id, next_run, sort_index, _scheduler,
        _dequeued, owner, or any other mutable runtime field).

        Two jobs with identical callable/trigger/args but different groups are distinct
        logical jobs and will not match.
        """
        if self.trigger is not None and other.trigger is not None:
            triggers_match = self.trigger.trigger_id() == other.trigger.trigger_id()
        else:
            triggers_match = self.trigger is other.trigger
        return (
            self.job == other.job
            and triggers_match
            and self.group == other.group
            and self.jitter == other.jitter
            and self.timeout == other.timeout
            and self.timeout_disabled == other.timeout_disabled
            and self.args == other.args
            and self.kwargs == other.kwargs
            and self.error_handler is other.error_handler
        )

    def diff_fields(self, other: "ScheduledJob") -> list[str]:
        """Return a list of configuration field names that differ between two jobs.

        Compares the same fields as ``matches()`` — callable, trigger, group,
        jitter, timeout, timeout_disabled, args, kwargs, error_handler.
        """
        changed: list[str] = []
        if self.job != other.job:
            changed.append("job")
        self_tid = self.trigger.trigger_id() if self.trigger is not None else None
        other_tid = other.trigger.trigger_id() if other.trigger is not None else None
        if self_tid != other_tid:
            changed.append("trigger")
        if self.group != other.group:
            changed.append("group")
        if self.jitter != other.jitter:
            changed.append("jitter")
        if self.timeout != other.timeout:
            changed.append("timeout")
        if self.timeout_disabled != other.timeout_disabled:
            changed.append("timeout_disabled")
        if self.args != other.args:
            changed.append("args")
        if self.kwargs != other.kwargs:
            changed.append("kwargs")
        if self.error_handler is not other.error_handler:
            changed.append("error_handler")
        return changed

    def set_app_error_handler_resolver(self, resolver: "Callable[[], SchedulerErrorHandlerType | None]") -> None:
        """Set the closure that resolves the app-level error handler at dispatch time."""
        self.app_error_handler_resolver = resolver

    def cancel(self) -> None:
        """Cancel the job by delegating to the owning Scheduler.

        Raises:
            RuntimeError: When called on a job that has not been registered with a Scheduler.
                Use ``Scheduler.cancel_job(job)`` directly, or register the job first.
        """
        if self._scheduler is None:
            raise RuntimeError(
                "cancel() called on a job not registered with a Scheduler. "
                "Use Scheduler.cancel_job(job) or register the job first."
            )
        self._scheduler.cancel_job(self)

    def set_next_run(self, next_run: ZonedDateTime) -> None:
        """Update the next run timestamp, fire_at, and ordering metadata.

        Both ``next_run`` and ``fire_at`` are set to the rounded value. Call
        ``SchedulerService._apply_jitter_to_heap()`` after this to set a jittered
        ``fire_at`` when the job has ``jitter`` configured.
        """
        rounded = next_run.round(unit="second")
        self.next_run = rounded
        self.fire_at = rounded
        self.sort_index = (rounded.timestamp_nanos(), id(self))

sort_index: tuple[int, int] = field(init=False, repr=False) class-attribute instance-attribute

Tuple of (next_run timestamp with nanoseconds, object id) for ordering in a priority queue.

owner_id: str = field(compare=False) class-attribute instance-attribute

Unique string identifier for the owner of the job, e.g., a component or integration name.

next_run: ZonedDateTime = field(compare=False) class-attribute instance-attribute

Unjittered logical fire time — used as previous_run in subsequent trigger calls.

fire_at: ZonedDateTime = field(init=False, compare=False) class-attribute instance-attribute

Actual dispatch time, including any jitter offset.

Equals next_run when no jitter is configured. Set by SchedulerService._apply_jitter_to_heap() at enqueue time when jitter > 0. The pop loop in _ScheduledJobQueue.pop_due_and_peek_next compares against fire_at (not next_run) to decide when to dispatch.

job: JobCallable = field(compare=False) class-attribute instance-attribute

The callable to execute when the job runs.

app_key: str = field(default='', compare=False) class-attribute instance-attribute

Configuration-level app key for DB registration (e.g., 'my_app'). Empty for non-App owners.

instance_index: int = field(default=0, compare=False) class-attribute instance-attribute

App instance index for DB registration. 0 for non-App owners.

trigger: TriggerProtocol | None = field(compare=False, default=None) class-attribute instance-attribute

The trigger that determines the job's schedule.

group: str | None = field(default=None, compare=False) class-attribute instance-attribute

Optional group name for grouping related jobs. Included in deduplication comparison.

jitter: float | None = field(default=None, compare=False) class-attribute instance-attribute

Seconds of random offset applied at enqueue time by SchedulerService._apply_jitter_to_heap().

Does not affect next_run (unjittered logical fire time). See the fire_at field on ScheduledJob for the actual dispatch time after jitter is applied.

timeout: float | None = field(default=None, compare=False) class-attribute instance-attribute

Per-job timeout in seconds. None means use the global default (config.scheduler_job_timeout_seconds). A positive float overrides the default. Validated at construction: must be positive when set.

timeout_disabled: bool = field(default=False, compare=False) class-attribute instance-attribute

When True, timeout enforcement is disabled for this job regardless of the global default.

name: str = field(default='', compare=False) class-attribute instance-attribute

Optional name for the job for easier identification.

name_auto: bool = field(default=False, compare=False) class-attribute instance-attribute

Whether the name was auto-generated from the callable and trigger ID.

args: tuple[Any, ...] = field(default_factory=tuple, compare=False) class-attribute instance-attribute

Positional arguments to pass to the job callable.

kwargs: dict[str, Any] = field(default_factory=dict, compare=False) class-attribute instance-attribute

Keyword arguments to pass to the job callable.

error_handler: SchedulerErrorHandlerType | None = field(default=None, compare=False) class-attribute instance-attribute

Optional error handler for this job.

When set, this handler is invoked if the job raises an exception (including TimeoutError, but excluding CancelledError). Stored as-is for identity comparison in matches(). compare=False prevents Callable | None from corrupting the @dataclass(order=True) heap ordering.

db_id: int | None = field(default=None, compare=False) class-attribute instance-attribute

Database row ID for this job. Set by the executor after persistence; None until then.

source_location: str = field(default='', compare=False) class-attribute instance-attribute

Captured source location (file:line) of the user code that scheduled this job.

registration_source: str = field(default='', compare=False) class-attribute instance-attribute

Captured source code snippet of the scheduling call.

source_tier: SourceTier = field(default='app', compare=False) class-attribute instance-attribute

Whether this job originates from a user app or the framework itself.

app_error_handler_resolver: Callable[[], SchedulerErrorHandlerType | None] | None = field(default=None, init=False, repr=False) class-attribute instance-attribute

Closure that resolves the app-level error handler at dispatch time.

mark_registered(db_id: int) -> None

Set the database ID. Called by SchedulerService.add_job() after persistence.

First call wins — a second call is a no-op, so a retry or double-registration cannot overwrite the original id. Mirrors Listener.mark_registered.

Source code in src/hassette/scheduler/classes.py
259
260
261
262
263
264
265
266
def mark_registered(self, db_id: int) -> None:
    """Set the database ID. Called by SchedulerService.add_job() after persistence.

    First call wins — a second call is a no-op, so a retry or double-registration
    cannot overwrite the original id. Mirrors ``Listener.mark_registered``.
    """
    if self.db_id is None:
        self.db_id = db_id

matches(other: ScheduledJob) -> bool

Check whether two jobs represent the same logical configuration.

Compares callable, trigger (by trigger_id()), group, jitter, timeout, timeout_disabled, args, kwargs, and error_handler (by identity). Does not compare runtime state (db_id, next_run, sort_index, _scheduler, _dequeued, owner, or any other mutable runtime field).

Two jobs with identical callable/trigger/args but different groups are distinct logical jobs and will not match.

Source code in src/hassette/scheduler/classes.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def matches(self, other: "ScheduledJob") -> bool:
    """Check whether two jobs represent the same logical configuration.

    Compares callable, trigger (by trigger_id()), group, jitter, timeout,
    timeout_disabled, args, kwargs, and error_handler (by identity).
    Does not compare runtime state (db_id, next_run, sort_index, _scheduler,
    _dequeued, owner, or any other mutable runtime field).

    Two jobs with identical callable/trigger/args but different groups are distinct
    logical jobs and will not match.
    """
    if self.trigger is not None and other.trigger is not None:
        triggers_match = self.trigger.trigger_id() == other.trigger.trigger_id()
    else:
        triggers_match = self.trigger is other.trigger
    return (
        self.job == other.job
        and triggers_match
        and self.group == other.group
        and self.jitter == other.jitter
        and self.timeout == other.timeout
        and self.timeout_disabled == other.timeout_disabled
        and self.args == other.args
        and self.kwargs == other.kwargs
        and self.error_handler is other.error_handler
    )

diff_fields(other: ScheduledJob) -> list[str]

Return a list of configuration field names that differ between two jobs.

Compares the same fields as matches() — callable, trigger, group, jitter, timeout, timeout_disabled, args, kwargs, error_handler.

Source code in src/hassette/scheduler/classes.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def diff_fields(self, other: "ScheduledJob") -> list[str]:
    """Return a list of configuration field names that differ between two jobs.

    Compares the same fields as ``matches()`` — callable, trigger, group,
    jitter, timeout, timeout_disabled, args, kwargs, error_handler.
    """
    changed: list[str] = []
    if self.job != other.job:
        changed.append("job")
    self_tid = self.trigger.trigger_id() if self.trigger is not None else None
    other_tid = other.trigger.trigger_id() if other.trigger is not None else None
    if self_tid != other_tid:
        changed.append("trigger")
    if self.group != other.group:
        changed.append("group")
    if self.jitter != other.jitter:
        changed.append("jitter")
    if self.timeout != other.timeout:
        changed.append("timeout")
    if self.timeout_disabled != other.timeout_disabled:
        changed.append("timeout_disabled")
    if self.args != other.args:
        changed.append("args")
    if self.kwargs != other.kwargs:
        changed.append("kwargs")
    if self.error_handler is not other.error_handler:
        changed.append("error_handler")
    return changed

set_app_error_handler_resolver(resolver: Callable[[], SchedulerErrorHandlerType | None]) -> None

Set the closure that resolves the app-level error handler at dispatch time.

Source code in src/hassette/scheduler/classes.py
324
325
326
def set_app_error_handler_resolver(self, resolver: "Callable[[], SchedulerErrorHandlerType | None]") -> None:
    """Set the closure that resolves the app-level error handler at dispatch time."""
    self.app_error_handler_resolver = resolver

cancel() -> None

Cancel the job by delegating to the owning Scheduler.

Raises:

Type Description
RuntimeError

When called on a job that has not been registered with a Scheduler. Use Scheduler.cancel_job(job) directly, or register the job first.

Source code in src/hassette/scheduler/classes.py
328
329
330
331
332
333
334
335
336
337
338
339
340
def cancel(self) -> None:
    """Cancel the job by delegating to the owning Scheduler.

    Raises:
        RuntimeError: When called on a job that has not been registered with a Scheduler.
            Use ``Scheduler.cancel_job(job)`` directly, or register the job first.
    """
    if self._scheduler is None:
        raise RuntimeError(
            "cancel() called on a job not registered with a Scheduler. "
            "Use Scheduler.cancel_job(job) or register the job first."
        )
    self._scheduler.cancel_job(self)

set_next_run(next_run: ZonedDateTime) -> None

Update the next run timestamp, fire_at, and ordering metadata.

Both next_run and fire_at are set to the rounded value. Call SchedulerService._apply_jitter_to_heap() after this to set a jittered fire_at when the job has jitter configured.

Source code in src/hassette/scheduler/classes.py
342
343
344
345
346
347
348
349
350
351
352
def set_next_run(self, next_run: ZonedDateTime) -> None:
    """Update the next run timestamp, fire_at, and ordering metadata.

    Both ``next_run`` and ``fire_at`` are set to the rounded value. Call
    ``SchedulerService._apply_jitter_to_heap()`` after this to set a jittered
    ``fire_at`` when the job has ``jitter`` configured.
    """
    rounded = next_run.round(unit="second")
    self.next_run = rounded
    self.fire_at = rounded
    self.sort_index = (rounded.timestamp_nanos(), id(self))

SchedulerErrorContext dataclass

Bases: ErrorContext

Context passed to scheduler error handlers when a job raises an exception.

Attributes:

Name Type Description
exception BaseException

The exception that was raised by the job.

traceback str

Formatted traceback string.

job_name str

The name of the job function that raised the exception.

job_group str | None

The group the job belongs to, or None if ungrouped.

args tuple[Any, ...]

Positional arguments the job was scheduled with.

kwargs dict[str, Any]

Keyword arguments the job was scheduled with.

Source code in src/hassette/scheduler/error_context.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@dataclass(frozen=True)
class SchedulerErrorContext(ErrorContext):
    """Context passed to scheduler error handlers when a job raises an exception.

    Attributes:
        exception: The exception that was raised by the job.
        traceback: Formatted traceback string.
        job_name: The name of the job function that raised the exception.
        job_group: The group the job belongs to, or None if ungrouped.
        args: Positional arguments the job was scheduled with.
        kwargs: Keyword arguments the job was scheduled with.
    """

    job_name: str
    job_group: str | None
    args: tuple[Any, ...]
    kwargs: dict[str, Any]

    @property
    def _domain_label(self) -> str:
        return f"job={self.job_name}"

Scheduler

Bases: Resource

Scheduler resource for managing scheduled jobs.

Source code in src/hassette/scheduler/scheduler.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
class Scheduler(Resource):
    """Scheduler resource for managing scheduled jobs."""

    scheduler_service: SchedulerService
    """The scheduler service instance."""

    sync: SchedulerSyncFacade
    """Synchronous facade for scheduling jobs from sync code (e.g. ``AppSync`` hooks)."""

    _jobs_by_name: dict[str, "ScheduledJob"]
    """Tracks jobs by name for uniqueness validation within this scheduler instance."""

    _jobs_by_group: dict[str, set["ScheduledJob"]]
    """Tracks jobs by group for bulk cancellation.

    Uses ``set`` for O(1) membership test and discard. ``ScheduledJob.__hash__``
    is based on ``job_id``, which is unique and immutable after construction.
    """

    def __init__(self, hassette: "Hassette", *, parent: Resource | None = None) -> None:
        super().__init__(hassette, parent=parent)
        assert self.parent is not None, (
            "Scheduler requires a parent Resource for telemetry identity (app_key/source_tier)"
        )
        assert self.hassette._scheduler_service is not None, "Scheduler service not initialized"
        self.scheduler_service = self.hassette._scheduler_service
        self._jobs_by_name = {}
        self._jobs_by_group: dict[str, set[ScheduledJob]] = {}
        self._error_handler: SchedulerErrorHandlerType | None = None
        self.sync = self.add_child(SchedulerSyncFacade, scheduler=self)

        # Register removal callback so exhausted one-shot jobs are removed from _jobs_by_group
        # automatically when SchedulerService removes them after firing.
        self.scheduler_service.register_removal_callback(self.owner_id, self._on_job_removed)

    def _on_job_removed(self, job: "ScheduledJob") -> None:
        """Callback invoked by SchedulerService when a job is auto-exhausted.

        Keeps _jobs_by_group and _jobs_by_name in sync when SchedulerService removes a
        one-shot job after it fires or when a job is dequeued via cancel_job.
        """
        self._jobs_by_name.pop(job.name, None)
        if job.group is not None:
            group_set = self._jobs_by_group.get(job.group)
            if group_set is not None:
                group_set.discard(job)
                if not group_set:
                    del self._jobs_by_group[job.group]

    async def on_initialize(self) -> None:
        self._error_handler = None
        self.mark_ready(reason="Scheduler initialized")

    async def on_shutdown(self) -> None:
        await self._remove_all_jobs()
        self.scheduler_service.deregister_removal_callback(self.owner_id)

    def on_error(self, handler: "SchedulerErrorHandlerType") -> None:
        """Register an app-level error handler for this scheduler.

        The handler is called when any job on this scheduler raises an exception
        (including ``TimeoutError``) and the job does not have its own
        per-registration error handler.

        This is an app-level fallback — it is resolved at dispatch time, not at job
        registration time. A later call to ``on_error()`` replaces any previously
        registered handler.

        Note: error handlers are spawned as fire-and-forget tasks. Handlers spawned near
        app shutdown may be cancelled before they complete. Do not rely on error handlers
        for delivery-critical alerting during system teardown.

        Args:
            handler: A sync or async callable that accepts a
                :class:`~hassette.scheduler.error_context.SchedulerErrorContext`.
        """
        self._error_handler = handler

    @property
    def config_log_level(self) -> LOG_LEVEL_TYPE:
        """Return the log level from the config for this resource."""
        return self.hassette.config.logging.scheduler_service

    async def add_job(
        self, job: "ScheduledJob", *, if_exists: Literal["error", "skip", "replace"] = "error"
    ) -> "ScheduledJob":
        """Add a job to the scheduler.

        DB registration is awaited inline — ``job.db_id`` is set before this
        method returns, eliminating the window where a job fires with
        ``db_id=None``.

        Args:
            job: The job to add.
            if_exists: Behavior when a job with the same name already exists.
                ``"error"`` (default) raises ``ValueError``.
                ``"skip"`` returns the existing job if it matches; raises
                ``ValueError`` if the name matches but the configuration differs.
                ``"replace"`` cancels the existing job (recording it as cancelled
                in telemetry) and registers the new job in its place.

        Returns:
            The added job, or the existing job when ``if_exists="skip"`` and a
            matching job is already registered. ``job.db_id`` is a valid
            integer on return.

        Raises:
            TypeError: If job is not a ScheduledJob.
            ValueError: If a job with the same name already exists and either
                ``if_exists="error"`` or the existing job's configuration differs.
        """

        if not isinstance(job, ScheduledJob):
            raise TypeError(f"Expected ScheduledJob, got {type(job).__name__}")

        existing = self._jobs_by_name.get(job.name)
        if existing is not None:
            if if_exists == "replace":
                self.logger.debug("Replacing existing job '%s' (cancelling old, registering new)", job.name)
                self.cancel_job(existing)
            elif if_exists == "skip" and existing.matches(job):
                return existing
            elif if_exists == "skip":
                changed_fields = existing.diff_fields(job)
                raise ValueError(
                    f"A job named '{job.name}' already exists but its configuration has changed "
                    f"(changed fields: {', '.join(changed_fields)})"
                )
            else:
                raise ValueError(
                    f"A job named '{job.name}' already exists in scheduler for '{self.owner_id}'. "
                    "Job names must be unique per scheduler instance."
                )

        self._jobs_by_name[job.name] = job
        job._scheduler = self

        if job.group is not None:
            if job.group not in self._jobs_by_group:
                self._jobs_by_group[job.group] = set()
            self._jobs_by_group[job.group].add(job)

        job.set_app_error_handler_resolver(lambda: self._error_handler)
        await self.scheduler_service.add_job(job)

        return job

    def cancel_job(self, job: "ScheduledJob") -> None:
        """Cancel an individual job and persist the cancellation to the database.

        Idempotent: a second cancel on the same job is a silent no-op. Raises
        ``ValueError`` if the job belongs to a different scheduler instance.
        Spawns a durable ``mark_job_cancelled`` DB write (when ``db_id`` is set),
        dequeues the job from the service, and sets ``job._dequeued = True``.

        Must NOT call ``job.cancel()`` internally — that delegates back here and
        would cause infinite recursion.

        Args:
            job: The job to cancel.

        Raises:
            ValueError: If the job belongs to a different scheduler instance.
        """
        if job._dequeued:
            return  # idempotent — already cancelled
        if job._scheduler is not self:
            raise ValueError(
                f"cancel_job() called with a job belonging to a different scheduler "
                f"(job owner: {job._scheduler}, this scheduler: {self})"
            )
        if job.db_id is not None:
            # Spawn on scheduler_service.task_bucket (not self.task_bucket) so the
            # DB write survives Scheduler resource shutdown — the service's lifecycle
            # extends past the resource's cleanup phase.
            self.scheduler_service.task_bucket.spawn(
                self.scheduler_service.mark_job_cancelled(job.db_id),
                name="scheduler:mark_job_cancelled",
            )
        self.scheduler_service.dequeue_job(job)

    def _remove_all_jobs(self) -> asyncio.Task:
        """Remove all jobs for the owner of this scheduler."""
        self._jobs_by_name.clear()
        self._jobs_by_group.clear()
        return self.scheduler_service.remove_jobs_by_owner(self.owner_id)

    def cancel_group(self, group: str) -> None:
        """Cancel all jobs in the given group.

        Delegates to ``cancel_job`` per-member, which handles the DB write,
        dequeue, and ``_dequeued`` flag. Dict cleanup (``_jobs_by_group`` and
        ``_jobs_by_name``) is handled by the ``_on_job_removed`` callback
        fired by ``scheduler_service.dequeue_job``. No-op if the group does
        not exist.

        Args:
            group: The group name to cancel.
        """
        jobs = list(self._jobs_by_group.get(group, set()))
        for job in jobs:
            self.cancel_job(job)

    def list_jobs(self, group: str | None = None) -> list["ScheduledJob"]:
        """Return all or group-filtered jobs.

        Args:
            group: If provided, return only jobs in this group.
                If ``None`` (default), return all jobs.

        Returns:
            List of ScheduledJob instances.
        """
        if group is None:
            return list(self._jobs_by_name.values())
        return list(self._jobs_by_group.get(group, set()))

    def get_job_db_ids(self) -> list[int]:
        """Return the DB IDs of all registered jobs.

        Used by post-ready reconciliation in ``AppLifecycleService.initialize_instances()``
        to build the ``live_job_ids`` set. With synchronous registration, all jobs
        have a ``db_id`` set by the time ``on_initialize`` completes. The
        ``db_id is not None`` guard is kept as a defensive filter.

        Returns:
            List of integer DB row IDs for registered jobs.
        """
        return [job.db_id for job in self._jobs_by_name.values() if job.db_id is not None]

    async def schedule(
        self,
        func: "JobCallable",
        trigger: "TriggerProtocol",
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job using a trigger object.

        This is the primary entry point for scheduling. All convenience methods
        (``run_in``, ``run_every``, ``run_daily``, etc.) delegate here.

        Args:
            func: The function to run.
            trigger: A trigger object implementing ``TriggerProtocol``. Determines
                both the first run time and subsequent recurrences.
            name: Optional name for the job. If empty, an auto-name is derived from
                the callable and trigger.
            group: Optional group name for bulk management (see ``cancel_group``).
            jitter: Optional seconds of random offset to apply at enqueue time.
                Jitter is applied via ``SchedulerService._apply_jitter_to_heap`` on enqueue.
                See the ``fire_at`` field on ``ScheduledJob``.
            timeout: Per-job timeout in seconds. ``None`` uses the global default.
                A positive ``float`` overrides the default.
            timeout_disabled: When ``True``, timeout enforcement is disabled for this
                job regardless of the global default.
            on_error: Optional per-job error handler. When set, this handler is
                invoked if the job raises an exception (including ``TimeoutError``,
                but excluding ``CancelledError``). Overrides the app-level handler
                set via ``on_error()``.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job. ``job.db_id`` is a valid integer on return.
        """

        if jitter is not None and jitter < 0:
            raise ValueError("jitter must be non-negative")

        if not isinstance(trigger, TriggerProtocol):
            raise TypeError(
                f"trigger must implement TriggerProtocol; got {type(trigger).__name__}. "
                "Use hassette.scheduler.triggers (After, Once, Every, Daily, Cron)"
            )

        parent = self.parent
        assert parent is not None
        app_key = parent.app_key
        instance_index = parent.index
        source_tier = parent.source_tier
        assert source_tier in ("app", "framework"), f"Invalid source_tier={source_tier!r} on {parent.class_name}"

        # Capture source while user code is still on the stack (before async spawn boundary)
        source_location, registration_source = capture_registration_source()

        run_at = trigger.first_run_time(date_utils.now())

        job = ScheduledJob(
            owner_id=self.owner_id,
            next_run=run_at,
            job=func,
            trigger=trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            args=tuple(args) if args else (),
            kwargs=dict(kwargs) if kwargs else {},
            error_handler=on_error,
            app_key=app_key,
            instance_index=instance_index,
            source_location=source_location,
            registration_source=registration_source or "",
            source_tier=source_tier,
        )
        return await self.add_job(job, if_exists=if_exists)

    async def run_in(
        self,
        func: "JobCallable",
        delay: float,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run after a fixed delay (one-shot).

        Args:
            func: The function to run.
            delay: The delay in seconds before running the job.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.
        """
        trigger = After(seconds=float(delay))
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

    async def run_once(
        self,
        func: "JobCallable",
        at: str | ZonedDateTime,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        if_past: Literal["tomorrow", "error"] = "tomorrow",
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run once at a specific wall-clock time (one-shot).

        Args:
            func: The function to run.
            at: Target time. A ``"HH:MM"`` string (today in system timezone, or
                tomorrow if already past) or a ``ZonedDateTime``.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_past: Behaviour when the target time is in the past at construction
                time. ``"tomorrow"`` (default) defers by one day. ``"error"`` raises
                ``ValueError``. For ``ZonedDateTime`` inputs, ``if_past`` has no
                effect — the job always fires immediately if the instant is in the past.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.
        """
        trigger = Once(at=at, if_past=if_past)
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

    async def run_every(
        self,
        func: "JobCallable",
        hours: float = 0,
        minutes: float = 0,
        seconds: float = 0,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run at a fixed interval.

        Args:
            func: The function to run.
            hours: Interval hours component.
            minutes: Interval minutes component.
            seconds: Interval seconds component.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.
        """
        trigger = Every(hours=hours, minutes=minutes, seconds=seconds)
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

    async def run_minutely(
        self,
        func: "JobCallable",
        minutes: int = 1,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run every N minutes.

        Args:
            func: The function to run.
            minutes: The minute interval (must be >= 1).
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.
        """
        if minutes < 1:
            raise ValueError("Minute interval must be at least 1")
        trigger = Every(minutes=minutes)
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

    async def run_hourly(
        self,
        func: "JobCallable",
        hours: int = 1,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run every N hours.

        Args:
            func: The function to run.
            hours: The hour interval (must be >= 1).
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.
        """
        if hours < 1:
            raise ValueError("Hour interval must be at least 1")
        trigger = Every(hours=hours)
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

    async def run_daily(
        self,
        func: "JobCallable",
        at: str = "00:00",
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run once per day at a fixed wall-clock time.

        Uses a cron-based trigger internally to ensure DST-correct, wall-clock-aligned
        scheduling. This avoids the 24-hour drift bug of interval-based daily scheduling.

        Args:
            func: The function to run.
            at: Target wall-clock time in ``"HH:MM"`` format (default ``"00:00"``).
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.
        """
        trigger = Daily(at=at)
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

    async def run_cron(
        self,
        func: "JobCallable",
        expression: str,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job using a cron expression.

        Accepts both 5-field (standard Unix cron: ``minute hour dom month dow``)
        and 6-field expressions (seconds appended as a 6th field per croniter
        convention: ``minute hour dom month dow second``).

        Args:
            func: The function to run.
            expression: A valid 5- or 6-field cron expression.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.

        Raises:
            ValueError: If the cron expression is syntactically invalid.
        """
        trigger = Cron(expression)
        return await self.schedule(
            func,
            trigger,
            name=name,
            group=group,
            jitter=jitter,
            timeout=timeout,
            timeout_disabled=timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )

scheduler_service: SchedulerService = self.hassette._scheduler_service instance-attribute

The scheduler service instance.

sync: SchedulerSyncFacade = self.add_child(SchedulerSyncFacade, scheduler=self) instance-attribute

Synchronous facade for scheduling jobs from sync code (e.g. AppSync hooks).

config_log_level: LOG_LEVEL_TYPE property

Return the log level from the config for this resource.

on_error(handler: SchedulerErrorHandlerType) -> None

Register an app-level error handler for this scheduler.

The handler is called when any job on this scheduler raises an exception (including TimeoutError) and the job does not have its own per-registration error handler.

This is an app-level fallback — it is resolved at dispatch time, not at job registration time. A later call to on_error() replaces any previously registered handler.

Note: error handlers are spawned as fire-and-forget tasks. Handlers spawned near app shutdown may be cancelled before they complete. Do not rely on error handlers for delivery-critical alerting during system teardown.

Parameters:

Name Type Description Default
handler SchedulerErrorHandlerType

A sync or async callable that accepts a :class:~hassette.scheduler.error_context.SchedulerErrorContext.

required
Source code in src/hassette/scheduler/scheduler.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def on_error(self, handler: "SchedulerErrorHandlerType") -> None:
    """Register an app-level error handler for this scheduler.

    The handler is called when any job on this scheduler raises an exception
    (including ``TimeoutError``) and the job does not have its own
    per-registration error handler.

    This is an app-level fallback — it is resolved at dispatch time, not at job
    registration time. A later call to ``on_error()`` replaces any previously
    registered handler.

    Note: error handlers are spawned as fire-and-forget tasks. Handlers spawned near
    app shutdown may be cancelled before they complete. Do not rely on error handlers
    for delivery-critical alerting during system teardown.

    Args:
        handler: A sync or async callable that accepts a
            :class:`~hassette.scheduler.error_context.SchedulerErrorContext`.
    """
    self._error_handler = handler

add_job(job: ScheduledJob, *, if_exists: Literal['error', 'skip', 'replace'] = 'error') -> ScheduledJob async

Add a job to the scheduler.

DB registration is awaited inline — job.db_id is set before this method returns, eliminating the window where a job fires with db_id=None.

Parameters:

Name Type Description Default
job ScheduledJob

The job to add.

required
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. "error" (default) raises ValueError. "skip" returns the existing job if it matches; raises ValueError if the name matches but the configuration differs. "replace" cancels the existing job (recording it as cancelled in telemetry) and registers the new job in its place.

'error'

Returns:

Type Description
ScheduledJob

The added job, or the existing job when if_exists="skip" and a

ScheduledJob

matching job is already registered. job.db_id is a valid

ScheduledJob

integer on return.

Raises:

Type Description
TypeError

If job is not a ScheduledJob.

ValueError

If a job with the same name already exists and either if_exists="error" or the existing job's configuration differs.

Source code in src/hassette/scheduler/scheduler.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
async def add_job(
    self, job: "ScheduledJob", *, if_exists: Literal["error", "skip", "replace"] = "error"
) -> "ScheduledJob":
    """Add a job to the scheduler.

    DB registration is awaited inline — ``job.db_id`` is set before this
    method returns, eliminating the window where a job fires with
    ``db_id=None``.

    Args:
        job: The job to add.
        if_exists: Behavior when a job with the same name already exists.
            ``"error"`` (default) raises ``ValueError``.
            ``"skip"`` returns the existing job if it matches; raises
            ``ValueError`` if the name matches but the configuration differs.
            ``"replace"`` cancels the existing job (recording it as cancelled
            in telemetry) and registers the new job in its place.

    Returns:
        The added job, or the existing job when ``if_exists="skip"`` and a
        matching job is already registered. ``job.db_id`` is a valid
        integer on return.

    Raises:
        TypeError: If job is not a ScheduledJob.
        ValueError: If a job with the same name already exists and either
            ``if_exists="error"`` or the existing job's configuration differs.
    """

    if not isinstance(job, ScheduledJob):
        raise TypeError(f"Expected ScheduledJob, got {type(job).__name__}")

    existing = self._jobs_by_name.get(job.name)
    if existing is not None:
        if if_exists == "replace":
            self.logger.debug("Replacing existing job '%s' (cancelling old, registering new)", job.name)
            self.cancel_job(existing)
        elif if_exists == "skip" and existing.matches(job):
            return existing
        elif if_exists == "skip":
            changed_fields = existing.diff_fields(job)
            raise ValueError(
                f"A job named '{job.name}' already exists but its configuration has changed "
                f"(changed fields: {', '.join(changed_fields)})"
            )
        else:
            raise ValueError(
                f"A job named '{job.name}' already exists in scheduler for '{self.owner_id}'. "
                "Job names must be unique per scheduler instance."
            )

    self._jobs_by_name[job.name] = job
    job._scheduler = self

    if job.group is not None:
        if job.group not in self._jobs_by_group:
            self._jobs_by_group[job.group] = set()
        self._jobs_by_group[job.group].add(job)

    job.set_app_error_handler_resolver(lambda: self._error_handler)
    await self.scheduler_service.add_job(job)

    return job

cancel_job(job: ScheduledJob) -> None

Cancel an individual job and persist the cancellation to the database.

Idempotent: a second cancel on the same job is a silent no-op. Raises ValueError if the job belongs to a different scheduler instance. Spawns a durable mark_job_cancelled DB write (when db_id is set), dequeues the job from the service, and sets job._dequeued = True.

Must NOT call job.cancel() internally — that delegates back here and would cause infinite recursion.

Parameters:

Name Type Description Default
job ScheduledJob

The job to cancel.

required

Raises:

Type Description
ValueError

If the job belongs to a different scheduler instance.

Source code in src/hassette/scheduler/scheduler.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def cancel_job(self, job: "ScheduledJob") -> None:
    """Cancel an individual job and persist the cancellation to the database.

    Idempotent: a second cancel on the same job is a silent no-op. Raises
    ``ValueError`` if the job belongs to a different scheduler instance.
    Spawns a durable ``mark_job_cancelled`` DB write (when ``db_id`` is set),
    dequeues the job from the service, and sets ``job._dequeued = True``.

    Must NOT call ``job.cancel()`` internally — that delegates back here and
    would cause infinite recursion.

    Args:
        job: The job to cancel.

    Raises:
        ValueError: If the job belongs to a different scheduler instance.
    """
    if job._dequeued:
        return  # idempotent — already cancelled
    if job._scheduler is not self:
        raise ValueError(
            f"cancel_job() called with a job belonging to a different scheduler "
            f"(job owner: {job._scheduler}, this scheduler: {self})"
        )
    if job.db_id is not None:
        # Spawn on scheduler_service.task_bucket (not self.task_bucket) so the
        # DB write survives Scheduler resource shutdown — the service's lifecycle
        # extends past the resource's cleanup phase.
        self.scheduler_service.task_bucket.spawn(
            self.scheduler_service.mark_job_cancelled(job.db_id),
            name="scheduler:mark_job_cancelled",
        )
    self.scheduler_service.dequeue_job(job)

cancel_group(group: str) -> None

Cancel all jobs in the given group.

Delegates to cancel_job per-member, which handles the DB write, dequeue, and _dequeued flag. Dict cleanup (_jobs_by_group and _jobs_by_name) is handled by the _on_job_removed callback fired by scheduler_service.dequeue_job. No-op if the group does not exist.

Parameters:

Name Type Description Default
group str

The group name to cancel.

required
Source code in src/hassette/scheduler/scheduler.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def cancel_group(self, group: str) -> None:
    """Cancel all jobs in the given group.

    Delegates to ``cancel_job`` per-member, which handles the DB write,
    dequeue, and ``_dequeued`` flag. Dict cleanup (``_jobs_by_group`` and
    ``_jobs_by_name``) is handled by the ``_on_job_removed`` callback
    fired by ``scheduler_service.dequeue_job``. No-op if the group does
    not exist.

    Args:
        group: The group name to cancel.
    """
    jobs = list(self._jobs_by_group.get(group, set()))
    for job in jobs:
        self.cancel_job(job)

list_jobs(group: str | None = None) -> list[ScheduledJob]

Return all or group-filtered jobs.

Parameters:

Name Type Description Default
group str | None

If provided, return only jobs in this group. If None (default), return all jobs.

None

Returns:

Type Description
list[ScheduledJob]

List of ScheduledJob instances.

Source code in src/hassette/scheduler/scheduler.py
289
290
291
292
293
294
295
296
297
298
299
300
301
def list_jobs(self, group: str | None = None) -> list["ScheduledJob"]:
    """Return all or group-filtered jobs.

    Args:
        group: If provided, return only jobs in this group.
            If ``None`` (default), return all jobs.

    Returns:
        List of ScheduledJob instances.
    """
    if group is None:
        return list(self._jobs_by_name.values())
    return list(self._jobs_by_group.get(group, set()))

get_job_db_ids() -> list[int]

Return the DB IDs of all registered jobs.

Used by post-ready reconciliation in AppLifecycleService.initialize_instances() to build the live_job_ids set. With synchronous registration, all jobs have a db_id set by the time on_initialize completes. The db_id is not None guard is kept as a defensive filter.

Returns:

Type Description
list[int]

List of integer DB row IDs for registered jobs.

Source code in src/hassette/scheduler/scheduler.py
303
304
305
306
307
308
309
310
311
312
313
314
def get_job_db_ids(self) -> list[int]:
    """Return the DB IDs of all registered jobs.

    Used by post-ready reconciliation in ``AppLifecycleService.initialize_instances()``
    to build the ``live_job_ids`` set. With synchronous registration, all jobs
    have a ``db_id`` set by the time ``on_initialize`` completes. The
    ``db_id is not None`` guard is kept as a defensive filter.

    Returns:
        List of integer DB row IDs for registered jobs.
    """
    return [job.db_id for job in self._jobs_by_name.values() if job.db_id is not None]

schedule(func: JobCallable, trigger: TriggerProtocol, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job using a trigger object.

This is the primary entry point for scheduling. All convenience methods (run_in, run_every, run_daily, etc.) delegate here.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
trigger TriggerProtocol

A trigger object implementing TriggerProtocol. Determines both the first run time and subsequent recurrences.

required
name str

Optional name for the job. If empty, an auto-name is derived from the callable and trigger.

''
group str | None

Optional group name for bulk management (see cancel_group).

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. Jitter is applied via SchedulerService._apply_jitter_to_heap on enqueue. See the fire_at field on ScheduledJob.

None
timeout float | None

Per-job timeout in seconds. None uses the global default. A positive float overrides the default.

None
timeout_disabled bool

When True, timeout enforcement is disabled for this job regardless of the global default.

False
on_error SchedulerErrorHandlerType | None

Optional per-job error handler. When set, this handler is invoked if the job raises an exception (including TimeoutError, but excluding CancelledError). Overrides the app-level handler set via on_error().

None
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job. job.db_id is a valid integer on return.

Source code in src/hassette/scheduler/scheduler.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
async def schedule(
    self,
    func: "JobCallable",
    trigger: "TriggerProtocol",
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job using a trigger object.

    This is the primary entry point for scheduling. All convenience methods
    (``run_in``, ``run_every``, ``run_daily``, etc.) delegate here.

    Args:
        func: The function to run.
        trigger: A trigger object implementing ``TriggerProtocol``. Determines
            both the first run time and subsequent recurrences.
        name: Optional name for the job. If empty, an auto-name is derived from
            the callable and trigger.
        group: Optional group name for bulk management (see ``cancel_group``).
        jitter: Optional seconds of random offset to apply at enqueue time.
            Jitter is applied via ``SchedulerService._apply_jitter_to_heap`` on enqueue.
            See the ``fire_at`` field on ``ScheduledJob``.
        timeout: Per-job timeout in seconds. ``None`` uses the global default.
            A positive ``float`` overrides the default.
        timeout_disabled: When ``True``, timeout enforcement is disabled for this
            job regardless of the global default.
        on_error: Optional per-job error handler. When set, this handler is
            invoked if the job raises an exception (including ``TimeoutError``,
            but excluding ``CancelledError``). Overrides the app-level handler
            set via ``on_error()``.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job. ``job.db_id`` is a valid integer on return.
    """

    if jitter is not None and jitter < 0:
        raise ValueError("jitter must be non-negative")

    if not isinstance(trigger, TriggerProtocol):
        raise TypeError(
            f"trigger must implement TriggerProtocol; got {type(trigger).__name__}. "
            "Use hassette.scheduler.triggers (After, Once, Every, Daily, Cron)"
        )

    parent = self.parent
    assert parent is not None
    app_key = parent.app_key
    instance_index = parent.index
    source_tier = parent.source_tier
    assert source_tier in ("app", "framework"), f"Invalid source_tier={source_tier!r} on {parent.class_name}"

    # Capture source while user code is still on the stack (before async spawn boundary)
    source_location, registration_source = capture_registration_source()

    run_at = trigger.first_run_time(date_utils.now())

    job = ScheduledJob(
        owner_id=self.owner_id,
        next_run=run_at,
        job=func,
        trigger=trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        args=tuple(args) if args else (),
        kwargs=dict(kwargs) if kwargs else {},
        error_handler=on_error,
        app_key=app_key,
        instance_index=instance_index,
        source_location=source_location,
        registration_source=registration_source or "",
        source_tier=source_tier,
    )
    return await self.add_job(job, if_exists=if_exists)

run_in(func: JobCallable, delay: float, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job to run after a fixed delay (one-shot).

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
delay float

The delay in seconds before running the job.

required
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/scheduler.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
async def run_in(
    self,
    func: "JobCallable",
    delay: float,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run after a fixed delay (one-shot).

    Args:
        func: The function to run.
        delay: The delay in seconds before running the job.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.
    """
    trigger = After(seconds=float(delay))
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

run_once(func: JobCallable, at: str | ZonedDateTime, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, if_past: Literal['tomorrow', 'error'] = 'tomorrow', *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job to run once at a specific wall-clock time (one-shot).

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
at str | ZonedDateTime

Target time. A "HH:MM" string (today in system timezone, or tomorrow if already past) or a ZonedDateTime.

required
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_past Literal['tomorrow', 'error']

Behaviour when the target time is in the past at construction time. "tomorrow" (default) defers by one day. "error" raises ValueError. For ZonedDateTime inputs, if_past has no effect — the job always fires immediately if the instant is in the past.

'tomorrow'
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/scheduler.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
async def run_once(
    self,
    func: "JobCallable",
    at: str | ZonedDateTime,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    if_past: Literal["tomorrow", "error"] = "tomorrow",
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run once at a specific wall-clock time (one-shot).

    Args:
        func: The function to run.
        at: Target time. A ``"HH:MM"`` string (today in system timezone, or
            tomorrow if already past) or a ``ZonedDateTime``.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_past: Behaviour when the target time is in the past at construction
            time. ``"tomorrow"`` (default) defers by one day. ``"error"`` raises
            ``ValueError``. For ``ZonedDateTime`` inputs, ``if_past`` has no
            effect — the job always fires immediately if the instant is in the past.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.
    """
    trigger = Once(at=at, if_past=if_past)
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

run_every(func: JobCallable, hours: float = 0, minutes: float = 0, seconds: float = 0, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job to run at a fixed interval.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
hours float

Interval hours component.

0
minutes float

Interval minutes component.

0
seconds float

Interval seconds component.

0
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/scheduler.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
async def run_every(
    self,
    func: "JobCallable",
    hours: float = 0,
    minutes: float = 0,
    seconds: float = 0,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run at a fixed interval.

    Args:
        func: The function to run.
        hours: Interval hours component.
        minutes: Interval minutes component.
        seconds: Interval seconds component.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.
    """
    trigger = Every(hours=hours, minutes=minutes, seconds=seconds)
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

run_minutely(func: JobCallable, minutes: int = 1, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job to run every N minutes.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
minutes int

The minute interval (must be >= 1).

1
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/scheduler.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
async def run_minutely(
    self,
    func: "JobCallable",
    minutes: int = 1,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run every N minutes.

    Args:
        func: The function to run.
        minutes: The minute interval (must be >= 1).
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.
    """
    if minutes < 1:
        raise ValueError("Minute interval must be at least 1")
    trigger = Every(minutes=minutes)
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

run_hourly(func: JobCallable, hours: int = 1, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job to run every N hours.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
hours int

The hour interval (must be >= 1).

1
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/scheduler.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
async def run_hourly(
    self,
    func: "JobCallable",
    hours: int = 1,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run every N hours.

    Args:
        func: The function to run.
        hours: The hour interval (must be >= 1).
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.
    """
    if hours < 1:
        raise ValueError("Hour interval must be at least 1")
    trigger = Every(hours=hours)
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

run_daily(func: JobCallable, at: str = '00:00', name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job to run once per day at a fixed wall-clock time.

Uses a cron-based trigger internally to ensure DST-correct, wall-clock-aligned scheduling. This avoids the 24-hour drift bug of interval-based daily scheduling.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
at str

Target wall-clock time in "HH:MM" format (default "00:00").

'00:00'
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/scheduler.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
async def run_daily(
    self,
    func: "JobCallable",
    at: str = "00:00",
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run once per day at a fixed wall-clock time.

    Uses a cron-based trigger internally to ensure DST-correct, wall-clock-aligned
    scheduling. This avoids the 24-hour drift bug of interval-based daily scheduling.

    Args:
        func: The function to run.
        at: Target wall-clock time in ``"HH:MM"`` format (default ``"00:00"``).
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.
    """
    trigger = Daily(at=at)
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

run_cron(func: JobCallable, expression: str, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob async

Schedule a job using a cron expression.

Accepts both 5-field (standard Unix cron: minute hour dom month dow) and 6-field expressions (seconds appended as a 6th field per croniter convention: minute hour dom month dow second).

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
expression str

A valid 5- or 6-field cron expression.

required
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Raises:

Type Description
ValueError

If the cron expression is syntactically invalid.

Source code in src/hassette/scheduler/scheduler.py
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
async def run_cron(
    self,
    func: "JobCallable",
    expression: str,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job using a cron expression.

    Accepts both 5-field (standard Unix cron: ``minute hour dom month dow``)
    and 6-field expressions (seconds appended as a 6th field per croniter
    convention: ``minute hour dom month dow second``).

    Args:
        func: The function to run.
        expression: A valid 5- or 6-field cron expression.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.

    Raises:
        ValueError: If the cron expression is syntactically invalid.
    """
    trigger = Cron(expression)
    return await self.schedule(
        func,
        trigger,
        name=name,
        group=group,
        jitter=jitter,
        timeout=timeout,
        timeout_disabled=timeout_disabled,
        on_error=on_error,
        if_exists=if_exists,
        args=args,
        kwargs=kwargs,
    )

SchedulerSyncFacade

Bases: Resource

Synchronous facade for the scheduler.

This class provides synchronous methods that wrap the asynchronous scheduling methods of the Scheduler class, allowing jobs to be scheduled from synchronous code (for example, an AppSync lifecycle hook running in a worker thread).

These methods must not be called from within the event loop; doing so raises a RuntimeError. Use the asynchronous methods on Scheduler directly when operating within an event loop.

Source code in src/hassette/scheduler/sync.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
class SchedulerSyncFacade(Resource):
    """Synchronous facade for the scheduler.

    This class provides synchronous methods that wrap the asynchronous scheduling methods of
    the Scheduler class, allowing jobs to be scheduled from synchronous code (for example, an
    ``AppSync`` lifecycle hook running in a worker thread).

    These methods must not be called from within the event loop; doing so raises a RuntimeError.
    Use the asynchronous methods on ``Scheduler`` directly when operating within an event loop.
    """

    _scheduler: "Scheduler"

    def __init__(self, hassette: "Hassette", *, scheduler: "Scheduler", parent: Resource | None = None) -> None:
        super().__init__(hassette, parent=parent)
        self._scheduler = scheduler

    async def on_initialize(self) -> None:
        self.mark_ready(reason="Synchronous Scheduler facade initialized")

    @property
    def config_log_level(self) -> LOG_LEVEL_TYPE:
        return self.hassette.config.logging.scheduler_service

    def add_job(
        self, job: "ScheduledJob", *, if_exists: Literal["error", "skip", "replace"] = "error"
    ) -> "ScheduledJob":
        """Add a job to the scheduler.

        DB registration completes inline — ``job.db_id`` is set before this
        method returns, eliminating the window where a job fires with
        ``db_id=None``.

        Args:
            job: The job to add.
            if_exists: Behavior when a job with the same name already exists.
                ``"error"`` (default) raises ``ValueError``.
                ``"skip"`` returns the existing job if it matches; raises
                ``ValueError`` if the name matches but the configuration differs.
                ``"replace"`` cancels the existing job (recording it as cancelled
                in telemetry) and registers the new job in its place.

        Returns:
            The added job, or the existing job when ``if_exists="skip"`` and a
            matching job is already registered. ``job.db_id`` is a valid
            integer on return.

        Raises:
            TypeError: If job is not a ScheduledJob.
            ValueError: If a job with the same name already exists and either
                ``if_exists="error"`` or the existing job's configuration differs."""

        return self.task_bucket.run_sync(self._scheduler.add_job(job, if_exists=if_exists))

    def schedule(
        self,
        func: "JobCallable",
        trigger: "TriggerProtocol",
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job using a trigger object.

        This is the primary entry point for scheduling. All convenience methods
        (``run_in``, ``run_every``, ``run_daily``, etc.) delegate here.

        Args:
            func: The function to run.
            trigger: A trigger object implementing ``TriggerProtocol``. Determines
                both the first run time and subsequent recurrences.
            name: Optional name for the job. If empty, an auto-name is derived from
                the callable and trigger.
            group: Optional group name for bulk management (see ``cancel_group``).
            jitter: Optional seconds of random offset to apply at enqueue time.
                Jitter is applied via ``SchedulerService._apply_jitter_to_heap`` on enqueue.
                See the ``fire_at`` field on ``ScheduledJob``.
            timeout: Per-job timeout in seconds. ``None`` uses the global default.
                A positive ``float`` overrides the default.
            timeout_disabled: When ``True``, timeout enforcement is disabled for this
                job regardless of the global default.
            on_error: Optional per-job error handler. When set, this handler is
                invoked if the job raises an exception (including ``TimeoutError``,
                but excluding ``CancelledError``). Overrides the app-level handler
                set via ``on_error()``.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job. ``job.db_id`` is a valid integer on return."""

        return self.task_bucket.run_sync(
            self._scheduler.schedule(
                func,
                trigger,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_in(
        self,
        func: "JobCallable",
        delay: float,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run after a fixed delay (one-shot).

        Args:
            func: The function to run.
            delay: The delay in seconds before running the job.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job."""

        return self.task_bucket.run_sync(
            self._scheduler.run_in(
                func,
                delay,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_once(
        self,
        func: "JobCallable",
        at: str | ZonedDateTime,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        if_past: Literal["tomorrow", "error"] = "tomorrow",
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run once at a specific wall-clock time (one-shot).

        Args:
            func: The function to run.
            at: Target time. A ``"HH:MM"`` string (today in system timezone, or
                tomorrow if already past) or a ``ZonedDateTime``.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_past: Behaviour when the target time is in the past at construction
                time. ``"tomorrow"`` (default) defers by one day. ``"error"`` raises
                ``ValueError``. For ``ZonedDateTime`` inputs, ``if_past`` has no
                effect — the job always fires immediately if the instant is in the past.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job."""

        return self.task_bucket.run_sync(
            self._scheduler.run_once(
                func,
                at,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                if_past,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_every(
        self,
        func: "JobCallable",
        hours: float = 0,
        minutes: float = 0,
        seconds: float = 0,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run at a fixed interval.

        Args:
            func: The function to run.
            hours: Interval hours component.
            minutes: Interval minutes component.
            seconds: Interval seconds component.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job."""

        return self.task_bucket.run_sync(
            self._scheduler.run_every(
                func,
                hours,
                minutes,
                seconds,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_minutely(
        self,
        func: "JobCallable",
        minutes: int = 1,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run every N minutes.

        Args:
            func: The function to run.
            minutes: The minute interval (must be >= 1).
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job."""

        return self.task_bucket.run_sync(
            self._scheduler.run_minutely(
                func,
                minutes,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_hourly(
        self,
        func: "JobCallable",
        hours: int = 1,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run every N hours.

        Args:
            func: The function to run.
            hours: The hour interval (must be >= 1).
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job."""

        return self.task_bucket.run_sync(
            self._scheduler.run_hourly(
                func,
                hours,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_daily(
        self,
        func: "JobCallable",
        at: str = "00:00",
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job to run once per day at a fixed wall-clock time.

        Uses a cron-based trigger internally to ensure DST-correct, wall-clock-aligned
        scheduling. This avoids the 24-hour drift bug of interval-based daily scheduling.

        Args:
            func: The function to run.
            at: Target wall-clock time in ``"HH:MM"`` format (default ``"00:00"``).
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job."""

        return self.task_bucket.run_sync(
            self._scheduler.run_daily(
                func,
                at,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def run_cron(
        self,
        func: "JobCallable",
        expression: str,
        name: str = "",
        group: str | None = None,
        jitter: float | None = None,
        timeout: float | None = None,
        timeout_disabled: bool = False,
        *,
        on_error: "SchedulerErrorHandlerType | None" = None,
        if_exists: Literal["error", "skip", "replace"] = "error",
        args: tuple[Any, ...] | None = None,
        kwargs: Mapping[str, Any] | None = None,
    ) -> "ScheduledJob":
        """Schedule a job using a cron expression.

        Accepts both 5-field (standard Unix cron: ``minute hour dom month dow``)
        and 6-field expressions (seconds appended as a 6th field per croniter
        convention: ``minute hour dom month dow second``).

        Args:
            func: The function to run.
            expression: A valid 5- or 6-field cron expression.
            name: Optional name for the job.
            group: Optional group name.
            jitter: Optional seconds of random offset to apply at enqueue time.
                See ``schedule()`` for details.
            timeout: Per-job timeout in seconds. See ``schedule()`` for details.
            timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
            if_exists: Behavior when a job with the same name already exists.
                See :meth:`add_job` for details.
            args: Positional arguments to pass to the callable when it executes.
            kwargs: Keyword arguments to pass to the callable when it executes.

        Returns:
            The scheduled job.

        Raises:
            ValueError: If the cron expression is syntactically invalid."""

        return self.task_bucket.run_sync(
            self._scheduler.run_cron(
                func,
                expression,
                name,
                group,
                jitter,
                timeout,
                timeout_disabled,
                on_error=on_error,
                if_exists=if_exists,
                args=args,
                kwargs=kwargs,
            )
        )

    def on_error(self, handler: "SchedulerErrorHandlerType") -> None:
        """Register an app-level error handler for this scheduler.

        The handler is called when any job on this scheduler raises an exception
        (including ``TimeoutError``) and the job does not have its own
        per-registration error handler.

        This is an app-level fallback — it is resolved at dispatch time, not at job
        registration time. A later call to ``on_error()`` replaces any previously
        registered handler.

        Note: error handlers are spawned as fire-and-forget tasks. Handlers spawned near
        app shutdown may be cancelled before they complete. Do not rely on error handlers
        for delivery-critical alerting during system teardown.

        Args:
            handler: A sync or async callable that accepts a
                :class:`~hassette.scheduler.error_context.SchedulerErrorContext`."""

        return self._scheduler.on_error(handler)

    def cancel_job(self, job: "ScheduledJob") -> None:
        """Cancel an individual job and persist the cancellation to the database.

        Idempotent: a second cancel on the same job is a silent no-op. Raises
        ``ValueError`` if the job belongs to a different scheduler instance.
        Spawns a durable ``mark_job_cancelled`` DB write (when ``db_id`` is set),
        dequeues the job from the service, and sets ``job._dequeued = True``.

        Must NOT call ``job.cancel()`` internally — that delegates back here and
        would cause infinite recursion.

        Args:
            job: The job to cancel.

        Raises:
            ValueError: If the job belongs to a different scheduler instance."""

        return self._scheduler.cancel_job(job)

    def cancel_group(self, group: str) -> None:
        """Cancel all jobs in the given group.

        Delegates to ``cancel_job`` per-member, which handles the DB write,
        dequeue, and ``_dequeued`` flag. Dict cleanup (``_jobs_by_group`` and
        ``_jobs_by_name``) is handled by the ``_on_job_removed`` callback
        fired by ``scheduler_service.dequeue_job``. No-op if the group does
        not exist.

        Args:
            group: The group name to cancel."""

        return self._scheduler.cancel_group(group)

    def list_jobs(self, group: str | None = None) -> list["ScheduledJob"]:
        """Return all or group-filtered jobs.

        Args:
            group: If provided, return only jobs in this group.
                If ``None`` (default), return all jobs.

        Returns:
            List of ScheduledJob instances."""

        return self._scheduler.list_jobs(group)

add_job(job: ScheduledJob, *, if_exists: Literal['error', 'skip', 'replace'] = 'error') -> ScheduledJob

Add a job to the scheduler.

DB registration completes inline — job.db_id is set before this method returns, eliminating the window where a job fires with db_id=None.

Parameters:

Name Type Description Default
job ScheduledJob

The job to add.

required
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. "error" (default) raises ValueError. "skip" returns the existing job if it matches; raises ValueError if the name matches but the configuration differs. "replace" cancels the existing job (recording it as cancelled in telemetry) and registers the new job in its place.

'error'

Returns:

Type Description
ScheduledJob

The added job, or the existing job when if_exists="skip" and a

ScheduledJob

matching job is already registered. job.db_id is a valid

ScheduledJob

integer on return.

Raises:

Type Description
TypeError

If job is not a ScheduledJob.

ValueError

If a job with the same name already exists and either if_exists="error" or the existing job's configuration differs.

Source code in src/hassette/scheduler/sync.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def add_job(
    self, job: "ScheduledJob", *, if_exists: Literal["error", "skip", "replace"] = "error"
) -> "ScheduledJob":
    """Add a job to the scheduler.

    DB registration completes inline — ``job.db_id`` is set before this
    method returns, eliminating the window where a job fires with
    ``db_id=None``.

    Args:
        job: The job to add.
        if_exists: Behavior when a job with the same name already exists.
            ``"error"`` (default) raises ``ValueError``.
            ``"skip"`` returns the existing job if it matches; raises
            ``ValueError`` if the name matches but the configuration differs.
            ``"replace"`` cancels the existing job (recording it as cancelled
            in telemetry) and registers the new job in its place.

    Returns:
        The added job, or the existing job when ``if_exists="skip"`` and a
        matching job is already registered. ``job.db_id`` is a valid
        integer on return.

    Raises:
        TypeError: If job is not a ScheduledJob.
        ValueError: If a job with the same name already exists and either
            ``if_exists="error"`` or the existing job's configuration differs."""

    return self.task_bucket.run_sync(self._scheduler.add_job(job, if_exists=if_exists))

schedule(func: JobCallable, trigger: TriggerProtocol, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job using a trigger object.

This is the primary entry point for scheduling. All convenience methods (run_in, run_every, run_daily, etc.) delegate here.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
trigger TriggerProtocol

A trigger object implementing TriggerProtocol. Determines both the first run time and subsequent recurrences.

required
name str

Optional name for the job. If empty, an auto-name is derived from the callable and trigger.

''
group str | None

Optional group name for bulk management (see cancel_group).

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. Jitter is applied via SchedulerService._apply_jitter_to_heap on enqueue. See the fire_at field on ScheduledJob.

None
timeout float | None

Per-job timeout in seconds. None uses the global default. A positive float overrides the default.

None
timeout_disabled bool

When True, timeout enforcement is disabled for this job regardless of the global default.

False
on_error SchedulerErrorHandlerType | None

Optional per-job error handler. When set, this handler is invoked if the job raises an exception (including TimeoutError, but excluding CancelledError). Overrides the app-level handler set via on_error().

None
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job. job.db_id is a valid integer on return.

Source code in src/hassette/scheduler/sync.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def schedule(
    self,
    func: "JobCallable",
    trigger: "TriggerProtocol",
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job using a trigger object.

    This is the primary entry point for scheduling. All convenience methods
    (``run_in``, ``run_every``, ``run_daily``, etc.) delegate here.

    Args:
        func: The function to run.
        trigger: A trigger object implementing ``TriggerProtocol``. Determines
            both the first run time and subsequent recurrences.
        name: Optional name for the job. If empty, an auto-name is derived from
            the callable and trigger.
        group: Optional group name for bulk management (see ``cancel_group``).
        jitter: Optional seconds of random offset to apply at enqueue time.
            Jitter is applied via ``SchedulerService._apply_jitter_to_heap`` on enqueue.
            See the ``fire_at`` field on ``ScheduledJob``.
        timeout: Per-job timeout in seconds. ``None`` uses the global default.
            A positive ``float`` overrides the default.
        timeout_disabled: When ``True``, timeout enforcement is disabled for this
            job regardless of the global default.
        on_error: Optional per-job error handler. When set, this handler is
            invoked if the job raises an exception (including ``TimeoutError``,
            but excluding ``CancelledError``). Overrides the app-level handler
            set via ``on_error()``.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job. ``job.db_id`` is a valid integer on return."""

    return self.task_bucket.run_sync(
        self._scheduler.schedule(
            func,
            trigger,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_in(func: JobCallable, delay: float, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job to run after a fixed delay (one-shot).

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
delay float

The delay in seconds before running the job.

required
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/sync.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def run_in(
    self,
    func: "JobCallable",
    delay: float,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run after a fixed delay (one-shot).

    Args:
        func: The function to run.
        delay: The delay in seconds before running the job.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job."""

    return self.task_bucket.run_sync(
        self._scheduler.run_in(
            func,
            delay,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_once(func: JobCallable, at: str | ZonedDateTime, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, if_past: Literal['tomorrow', 'error'] = 'tomorrow', *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job to run once at a specific wall-clock time (one-shot).

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
at str | ZonedDateTime

Target time. A "HH:MM" string (today in system timezone, or tomorrow if already past) or a ZonedDateTime.

required
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_past Literal['tomorrow', 'error']

Behaviour when the target time is in the past at construction time. "tomorrow" (default) defers by one day. "error" raises ValueError. For ZonedDateTime inputs, if_past has no effect — the job always fires immediately if the instant is in the past.

'tomorrow'
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/sync.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def run_once(
    self,
    func: "JobCallable",
    at: str | ZonedDateTime,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    if_past: Literal["tomorrow", "error"] = "tomorrow",
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run once at a specific wall-clock time (one-shot).

    Args:
        func: The function to run.
        at: Target time. A ``"HH:MM"`` string (today in system timezone, or
            tomorrow if already past) or a ``ZonedDateTime``.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_past: Behaviour when the target time is in the past at construction
            time. ``"tomorrow"`` (default) defers by one day. ``"error"`` raises
            ``ValueError``. For ``ZonedDateTime`` inputs, ``if_past`` has no
            effect — the job always fires immediately if the instant is in the past.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job."""

    return self.task_bucket.run_sync(
        self._scheduler.run_once(
            func,
            at,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            if_past,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_every(func: JobCallable, hours: float = 0, minutes: float = 0, seconds: float = 0, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job to run at a fixed interval.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
hours float

Interval hours component.

0
minutes float

Interval minutes component.

0
seconds float

Interval seconds component.

0
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/sync.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def run_every(
    self,
    func: "JobCallable",
    hours: float = 0,
    minutes: float = 0,
    seconds: float = 0,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run at a fixed interval.

    Args:
        func: The function to run.
        hours: Interval hours component.
        minutes: Interval minutes component.
        seconds: Interval seconds component.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job."""

    return self.task_bucket.run_sync(
        self._scheduler.run_every(
            func,
            hours,
            minutes,
            seconds,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_minutely(func: JobCallable, minutes: int = 1, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job to run every N minutes.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
minutes int

The minute interval (must be >= 1).

1
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/sync.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def run_minutely(
    self,
    func: "JobCallable",
    minutes: int = 1,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run every N minutes.

    Args:
        func: The function to run.
        minutes: The minute interval (must be >= 1).
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job."""

    return self.task_bucket.run_sync(
        self._scheduler.run_minutely(
            func,
            minutes,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_hourly(func: JobCallable, hours: int = 1, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job to run every N hours.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
hours int

The hour interval (must be >= 1).

1
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/sync.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def run_hourly(
    self,
    func: "JobCallable",
    hours: int = 1,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run every N hours.

    Args:
        func: The function to run.
        hours: The hour interval (must be >= 1).
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job."""

    return self.task_bucket.run_sync(
        self._scheduler.run_hourly(
            func,
            hours,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_daily(func: JobCallable, at: str = '00:00', name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job to run once per day at a fixed wall-clock time.

Uses a cron-based trigger internally to ensure DST-correct, wall-clock-aligned scheduling. This avoids the 24-hour drift bug of interval-based daily scheduling.

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
at str

Target wall-clock time in "HH:MM" format (default "00:00").

'00:00'
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Source code in src/hassette/scheduler/sync.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def run_daily(
    self,
    func: "JobCallable",
    at: str = "00:00",
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job to run once per day at a fixed wall-clock time.

    Uses a cron-based trigger internally to ensure DST-correct, wall-clock-aligned
    scheduling. This avoids the 24-hour drift bug of interval-based daily scheduling.

    Args:
        func: The function to run.
        at: Target wall-clock time in ``"HH:MM"`` format (default ``"00:00"``).
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job."""

    return self.task_bucket.run_sync(
        self._scheduler.run_daily(
            func,
            at,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

run_cron(func: JobCallable, expression: str, name: str = '', group: str | None = None, jitter: float | None = None, timeout: float | None = None, timeout_disabled: bool = False, *, on_error: SchedulerErrorHandlerType | None = None, if_exists: Literal['error', 'skip', 'replace'] = 'error', args: tuple[Any, ...] | None = None, kwargs: Mapping[str, Any] | None = None) -> ScheduledJob

Schedule a job using a cron expression.

Accepts both 5-field (standard Unix cron: minute hour dom month dow) and 6-field expressions (seconds appended as a 6th field per croniter convention: minute hour dom month dow second).

Parameters:

Name Type Description Default
func JobCallable

The function to run.

required
expression str

A valid 5- or 6-field cron expression.

required
name str

Optional name for the job.

''
group str | None

Optional group name.

None
jitter float | None

Optional seconds of random offset to apply at enqueue time. See schedule() for details.

None
timeout float | None

Per-job timeout in seconds. See schedule() for details.

None
timeout_disabled bool

Disable timeout enforcement. See schedule() for details.

False
if_exists Literal['error', 'skip', 'replace']

Behavior when a job with the same name already exists. See :meth:add_job for details.

'error'
args tuple[Any, ...] | None

Positional arguments to pass to the callable when it executes.

None
kwargs Mapping[str, Any] | None

Keyword arguments to pass to the callable when it executes.

None

Returns:

Type Description
ScheduledJob

The scheduled job.

Raises:

Type Description
ValueError

If the cron expression is syntactically invalid.

Source code in src/hassette/scheduler/sync.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def run_cron(
    self,
    func: "JobCallable",
    expression: str,
    name: str = "",
    group: str | None = None,
    jitter: float | None = None,
    timeout: float | None = None,
    timeout_disabled: bool = False,
    *,
    on_error: "SchedulerErrorHandlerType | None" = None,
    if_exists: Literal["error", "skip", "replace"] = "error",
    args: tuple[Any, ...] | None = None,
    kwargs: Mapping[str, Any] | None = None,
) -> "ScheduledJob":
    """Schedule a job using a cron expression.

    Accepts both 5-field (standard Unix cron: ``minute hour dom month dow``)
    and 6-field expressions (seconds appended as a 6th field per croniter
    convention: ``minute hour dom month dow second``).

    Args:
        func: The function to run.
        expression: A valid 5- or 6-field cron expression.
        name: Optional name for the job.
        group: Optional group name.
        jitter: Optional seconds of random offset to apply at enqueue time.
            See ``schedule()`` for details.
        timeout: Per-job timeout in seconds. See ``schedule()`` for details.
        timeout_disabled: Disable timeout enforcement. See ``schedule()`` for details.
        if_exists: Behavior when a job with the same name already exists.
            See :meth:`add_job` for details.
        args: Positional arguments to pass to the callable when it executes.
        kwargs: Keyword arguments to pass to the callable when it executes.

    Returns:
        The scheduled job.

    Raises:
        ValueError: If the cron expression is syntactically invalid."""

    return self.task_bucket.run_sync(
        self._scheduler.run_cron(
            func,
            expression,
            name,
            group,
            jitter,
            timeout,
            timeout_disabled,
            on_error=on_error,
            if_exists=if_exists,
            args=args,
            kwargs=kwargs,
        )
    )

on_error(handler: SchedulerErrorHandlerType) -> None

Register an app-level error handler for this scheduler.

The handler is called when any job on this scheduler raises an exception (including TimeoutError) and the job does not have its own per-registration error handler.

This is an app-level fallback — it is resolved at dispatch time, not at job registration time. A later call to on_error() replaces any previously registered handler.

Note: error handlers are spawned as fire-and-forget tasks. Handlers spawned near app shutdown may be cancelled before they complete. Do not rely on error handlers for delivery-critical alerting during system teardown.

Parameters:

Name Type Description Default
handler SchedulerErrorHandlerType

A sync or async callable that accepts a :class:~hassette.scheduler.error_context.SchedulerErrorContext.

required
Source code in src/hassette/scheduler/sync.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def on_error(self, handler: "SchedulerErrorHandlerType") -> None:
    """Register an app-level error handler for this scheduler.

    The handler is called when any job on this scheduler raises an exception
    (including ``TimeoutError``) and the job does not have its own
    per-registration error handler.

    This is an app-level fallback — it is resolved at dispatch time, not at job
    registration time. A later call to ``on_error()`` replaces any previously
    registered handler.

    Note: error handlers are spawned as fire-and-forget tasks. Handlers spawned near
    app shutdown may be cancelled before they complete. Do not rely on error handlers
    for delivery-critical alerting during system teardown.

    Args:
        handler: A sync or async callable that accepts a
            :class:`~hassette.scheduler.error_context.SchedulerErrorContext`."""

    return self._scheduler.on_error(handler)

cancel_job(job: ScheduledJob) -> None

Cancel an individual job and persist the cancellation to the database.

Idempotent: a second cancel on the same job is a silent no-op. Raises ValueError if the job belongs to a different scheduler instance. Spawns a durable mark_job_cancelled DB write (when db_id is set), dequeues the job from the service, and sets job._dequeued = True.

Must NOT call job.cancel() internally — that delegates back here and would cause infinite recursion.

Parameters:

Name Type Description Default
job ScheduledJob

The job to cancel.

required

Raises:

Type Description
ValueError

If the job belongs to a different scheduler instance.

Source code in src/hassette/scheduler/sync.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def cancel_job(self, job: "ScheduledJob") -> None:
    """Cancel an individual job and persist the cancellation to the database.

    Idempotent: a second cancel on the same job is a silent no-op. Raises
    ``ValueError`` if the job belongs to a different scheduler instance.
    Spawns a durable ``mark_job_cancelled`` DB write (when ``db_id`` is set),
    dequeues the job from the service, and sets ``job._dequeued = True``.

    Must NOT call ``job.cancel()`` internally — that delegates back here and
    would cause infinite recursion.

    Args:
        job: The job to cancel.

    Raises:
        ValueError: If the job belongs to a different scheduler instance."""

    return self._scheduler.cancel_job(job)

cancel_group(group: str) -> None

Cancel all jobs in the given group.

Delegates to cancel_job per-member, which handles the DB write, dequeue, and _dequeued flag. Dict cleanup (_jobs_by_group and _jobs_by_name) is handled by the _on_job_removed callback fired by scheduler_service.dequeue_job. No-op if the group does not exist.

Parameters:

Name Type Description Default
group str

The group name to cancel.

required
Source code in src/hassette/scheduler/sync.py
553
554
555
556
557
558
559
560
561
562
563
564
565
def cancel_group(self, group: str) -> None:
    """Cancel all jobs in the given group.

    Delegates to ``cancel_job`` per-member, which handles the DB write,
    dequeue, and ``_dequeued`` flag. Dict cleanup (``_jobs_by_group`` and
    ``_jobs_by_name``) is handled by the ``_on_job_removed`` callback
    fired by ``scheduler_service.dequeue_job``. No-op if the group does
    not exist.

    Args:
        group: The group name to cancel."""

    return self._scheduler.cancel_group(group)

list_jobs(group: str | None = None) -> list[ScheduledJob]

Return all or group-filtered jobs.

Parameters:

Name Type Description Default
group str | None

If provided, return only jobs in this group. If None (default), return all jobs.

None

Returns:

Type Description
list[ScheduledJob]

List of ScheduledJob instances.

Source code in src/hassette/scheduler/sync.py
567
568
569
570
571
572
573
574
575
576
577
def list_jobs(self, group: str | None = None) -> list["ScheduledJob"]:
    """Return all or group-filtered jobs.

    Args:
        group: If provided, return only jobs in this group.
            If ``None`` (default), return all jobs.

    Returns:
        List of ScheduledJob instances."""

    return self._scheduler.list_jobs(group)

After

One-shot trigger that fires once after a fixed delay.

Accepts seconds, minutes, or a TimeDelta directly.

Parameters:

Name Type Description Default
seconds float

Delay in seconds.

0
minutes float

Delay in minutes.

0
timedelta TimeDelta | None

Delay as a TimeDelta object. Mutually exclusive with seconds/minutes.

None
Example

After(seconds=30) # fires 30 seconds from now After(minutes=5) # fires 5 minutes from now

Source code in src/hassette/scheduler/triggers.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class After:
    """One-shot trigger that fires once after a fixed delay.

    Accepts seconds, minutes, or a TimeDelta directly.

    Args:
        seconds: Delay in seconds.
        minutes: Delay in minutes.
        timedelta: Delay as a TimeDelta object. Mutually exclusive with seconds/minutes.

    Example:
        After(seconds=30)       # fires 30 seconds from now
        After(minutes=5)        # fires 5 minutes from now
    """

    def __init__(
        self,
        seconds: float = 0,
        minutes: float = 0,
        timedelta: TimeDelta | None = None,
    ) -> None:
        if timedelta is not None:
            self._delay = timedelta
        else:
            self._delay = TimeDelta(seconds=seconds, minutes=minutes)
        if self._delay.in_seconds() <= 0:
            raise ValueError("After trigger delay must be positive")

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return current_time plus the delay."""
        return current_time.add(seconds=self._delay.in_seconds()).round(unit="second")

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> None:
        """One-shot trigger; always returns None."""
        return

    def trigger_label(self) -> str:
        return "after"

    def trigger_detail(self) -> str | None:
        return f"{int(self._delay.in_seconds())}s"

    def trigger_db_type(self) -> Literal["after"]:
        return "after"

    def trigger_id(self) -> str:
        return f"after:{int(self._delay.in_seconds())}"

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return current_time plus the delay.

Source code in src/hassette/scheduler/triggers.py
60
61
62
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return current_time plus the delay."""
    return current_time.add(seconds=self._delay.in_seconds()).round(unit="second")

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> None

One-shot trigger; always returns None.

Source code in src/hassette/scheduler/triggers.py
64
65
66
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> None:
    """One-shot trigger; always returns None."""
    return

Cron

Trigger based on an arbitrary cron expression.

Accepts both 5-field (standard Unix cron: minute hour dom month dow) and 6-field expressions (seconds appended as a 6th field per croniter convention: minute hour dom month dow second).

Parameters:

Name Type Description Default
expression str

A valid 5- or 6-field cron expression.

required

Raises:

Type Description
ValueError

If the expression is syntactically invalid.

Example

Cron("0 9 * * 1-5") # weekdays at 09:00 Cron("0 9 * * 1-5 0") # weekdays at 09:00:00 (6-field)

Source code in src/hassette/scheduler/triggers.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
class Cron:
    """Trigger based on an arbitrary cron expression.

    Accepts both 5-field (standard Unix cron: ``minute hour dom month dow``)
    and 6-field expressions (seconds appended as a 6th field per croniter
    convention: ``minute hour dom month dow second``).

    Args:
        expression: A valid 5- or 6-field cron expression.

    Raises:
        ValueError: If the expression is syntactically invalid.

    Example:
        Cron("0 9 * * 1-5")    # weekdays at 09:00
        Cron("0 9 * * 1-5 0")  # weekdays at 09:00:00 (6-field)
    """

    def __init__(self, expression: str) -> None:
        self._expression = expression
        try:
            self._cron = CronTrigger(expression)
        except ValueError as e:
            raise ValueError(f"Invalid cron expression: {expression!r}") from e

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the first cron-grid-aligned run time at or after current_time."""
        return self._cron.first_run_time(current_time)

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the next cron-grid-aligned run time after previous_run that is later than current_time."""
        return self._cron.next_run_time(previous_run, current_time)

    def trigger_label(self) -> str:
        return "cron"

    def trigger_detail(self) -> str | None:
        return self._expression

    def trigger_db_type(self) -> Literal["cron"]:
        return "cron"

    def trigger_id(self) -> str:
        return f"cron:{self._expression}"

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return the first cron-grid-aligned run time at or after current_time.

Source code in src/hassette/scheduler/triggers.py
323
324
325
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the first cron-grid-aligned run time at or after current_time."""
    return self._cron.first_run_time(current_time)

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime

Return the next cron-grid-aligned run time after previous_run that is later than current_time.

Source code in src/hassette/scheduler/triggers.py
327
328
329
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the next cron-grid-aligned run time after previous_run that is later than current_time."""
    return self._cron.next_run_time(previous_run, current_time)

Daily

Trigger that fires once per day at a fixed wall-clock time.

Internally delegates to a 5-field cron expression to ensure DST-correct, wall-clock-aligned scheduling.

.. warning:: "HH:MM" is resolved against the system process timezone (from date_utils.now().tz). Docker containers commonly run TZ=UTC while the user's Home Assistant is configured for a local zone — a Daily(at="07:00") written for local time will fire at 07:00 UTC in that environment with no warning. To avoid this: set TZ on the container to match the HA zone. A future tz= parameter is tracked in the design doc follow-up work.

Parameters:

Name Type Description Default
at str

Target time in "HH:MM" format (e.g. "07:00"), interpreted in the system process timezone — see warning above.

required
Example

Daily(at="07:00") # fires every day at 07:00 wall-clock time

Source code in src/hassette/scheduler/triggers.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class Daily:
    """Trigger that fires once per day at a fixed wall-clock time.

    Internally delegates to a 5-field cron expression to ensure DST-correct,
    wall-clock-aligned scheduling.

    .. warning::
        ``"HH:MM"`` is resolved against the **system process timezone** (from
        ``date_utils.now().tz``). Docker containers commonly run ``TZ=UTC``
        while the user's Home Assistant is configured for a local zone — a
        ``Daily(at="07:00")`` written for local time will fire at 07:00 UTC
        in that environment with no warning. To avoid this: set ``TZ`` on the
        container to match the HA zone. A future ``tz=`` parameter is tracked
        in the design doc follow-up work.

    Args:
        at: Target time in ``"HH:MM"`` format (e.g. ``"07:00"``),
            interpreted in the **system process timezone** — see warning above.

    Example:
        Daily(at="07:00")   # fires every day at 07:00 wall-clock time
    """

    def __init__(self, at: str) -> None:
        hour, minute = parse_hh_mm(at, "Daily")
        # 5-field standard cron: minute hour dom month dow
        self._expr = f"{minute} {hour} * * *"
        self._at_str = at
        self._cron = CronTrigger(self._expr)

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the next cron-grid-aligned daily run time at or after current_time."""
        return self._cron.first_run_time(current_time)

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the next daily run time after previous_run that is later than current_time."""
        return self._cron.next_run_time(previous_run, current_time)

    def trigger_label(self) -> str:
        return "daily"

    def trigger_detail(self) -> str | None:
        return self._at_str

    def trigger_db_type(self) -> Literal["cron"]:
        return "cron"

    def trigger_id(self) -> str:
        return f"cron:{self._expr}"

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return the next cron-grid-aligned daily run time at or after current_time.

Source code in src/hassette/scheduler/triggers.py
277
278
279
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the next cron-grid-aligned daily run time at or after current_time."""
    return self._cron.first_run_time(current_time)

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime

Return the next daily run time after previous_run that is later than current_time.

Source code in src/hassette/scheduler/triggers.py
281
282
283
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the next daily run time after previous_run that is later than current_time."""
    return self._cron.next_run_time(previous_run, current_time)

Every

Fixed-interval trigger with drift-resistant scheduling.

Accepts seconds, hours, minutes, or a combination. An optional start parameter anchors the interval grid; if omitted, the first call to first_run_time is used as the anchor.

Parameters:

Name Type Description Default
seconds float

Interval component in seconds.

0
minutes float

Interval component in minutes.

0
hours float

Interval component in hours.

0
start ZonedDateTime | None

Optional ZonedDateTime anchor for the interval grid. If the anchor is in the past, missed intervals are skipped to produce a near-future run time.

None
Example

Every(hours=1) # every hour, anchored to first run Every(seconds=30, start=my_start_time) # every 30 s, grid anchored to my_start_time

Source code in src/hassette/scheduler/triggers.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class Every:
    """Fixed-interval trigger with drift-resistant scheduling.

    Accepts seconds, hours, minutes, or a combination. An optional ``start``
    parameter anchors the interval grid; if omitted, the first call to
    ``first_run_time`` is used as the anchor.

    Args:
        seconds: Interval component in seconds.
        minutes: Interval component in minutes.
        hours: Interval component in hours.
        start: Optional ``ZonedDateTime`` anchor for the interval grid. If the
            anchor is in the past, missed intervals are skipped to produce a
            near-future run time.

    Example:
        Every(hours=1)                          # every hour, anchored to first run
        Every(seconds=30, start=my_start_time)  # every 30 s, grid anchored to my_start_time
    """

    def __init__(
        self,
        seconds: float = 0,
        minutes: float = 0,
        hours: float = 0,
        start: ZonedDateTime | None = None,
    ) -> None:
        total = TimeDelta(seconds=seconds, minutes=minutes, hours=hours)
        if total.in_seconds() <= 0:
            raise ValueError("Every trigger interval must be positive")
        if total.in_seconds() != int(total.in_seconds()):
            raise ValueError("Every trigger interval must be a whole number of seconds")
        self._interval = total
        self._start = start

    @property
    def interval_seconds(self) -> float:
        return self._interval.in_seconds()

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the first run time, aligned to the interval grid."""
        start = self._start if self._start is not None else current_time
        if start > current_time:
            return start.round(unit="second")
        return self.advance_past(start, current_time)

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the next interval tick after previous_run that is later than current_time."""
        return self.advance_past(previous_run, current_time)

    def advance_past(self, anchor: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        """Advance anchor by whole intervals until the result is strictly after current_time."""
        interval_secs = self._interval.in_seconds()
        elapsed = (current_time - anchor).in_seconds()
        if elapsed > 0:
            missed = int(elapsed / interval_secs)
            anchor = anchor.add(seconds=missed * interval_secs)
        result = anchor.add(seconds=interval_secs)
        # Guard: if floating-point truncation landed result at or before current_time,
        # advance one more interval. Boundary-exact slots are treated as "past."
        if result <= current_time:
            result = result.add(seconds=interval_secs)
        return result.round(unit="second")

    def trigger_label(self) -> str:
        return "every"

    def trigger_detail(self) -> str | None:
        return f"{int(self.interval_seconds)}s"

    def trigger_db_type(self) -> Literal["interval"]:
        return "interval"

    def trigger_id(self) -> str:
        return f"every:{int(self.interval_seconds)}"

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return the first run time, aligned to the interval grid.

Source code in src/hassette/scheduler/triggers.py
209
210
211
212
213
214
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the first run time, aligned to the interval grid."""
    start = self._start if self._start is not None else current_time
    if start > current_time:
        return start.round(unit="second")
    return self.advance_past(start, current_time)

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime

Return the next interval tick after previous_run that is later than current_time.

Source code in src/hassette/scheduler/triggers.py
216
217
218
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the next interval tick after previous_run that is later than current_time."""
    return self.advance_past(previous_run, current_time)

advance_past(anchor: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime

Advance anchor by whole intervals until the result is strictly after current_time.

Source code in src/hassette/scheduler/triggers.py
220
221
222
223
224
225
226
227
228
229
230
231
232
def advance_past(self, anchor: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
    """Advance anchor by whole intervals until the result is strictly after current_time."""
    interval_secs = self._interval.in_seconds()
    elapsed = (current_time - anchor).in_seconds()
    if elapsed > 0:
        missed = int(elapsed / interval_secs)
        anchor = anchor.add(seconds=missed * interval_secs)
    result = anchor.add(seconds=interval_secs)
    # Guard: if floating-point truncation landed result at or before current_time,
    # advance one more interval. Boundary-exact slots are treated as "past."
    if result <= current_time:
        result = result.add(seconds=interval_secs)
    return result.round(unit="second")

Once

One-shot trigger that fires at a specific wall-clock time.

.. warning:: "HH:MM" string inputs are resolved against the system process timezone (date_utils.now().tz). Docker containers commonly run with TZ=UTC while the user's Home Assistant is configured for a local zone — a Once(at="07:00") written for local time will fire at 07:00 UTC in that environment with no warning. To avoid this: set TZ on the container to match the HA zone, or pass a ZonedDateTime directly with the intended timezone.

Parameters:

Name Type Description Default
at str | ZonedDateTime

Target time. Accepts a "HH:MM" string (interpreted as today's wall-clock time in the system process timezone — see warning above) or a ZonedDateTime (absolute instant; timezone is explicit).

required
if_past Literal['tomorrow', 'error']

Behavior when the computed fire time is in the past. - "tomorrow" (default): For "HH:MM" string inputs, defers to the next day. No effect for ZonedDateTime inputs (the absolute instant is used as-is; if it is in the past, the job fires immediately). - "error": Raises ValueError if the computed fire time is in the past. Applies to both "HH:MM" and ZonedDateTime inputs.

'tomorrow'
Example

Once(at="07:00") # fires today at 07:00 (or tomorrow if past) Once(at="07:00", if_past="error") # raises if 07:00 has already passed

Source code in src/hassette/scheduler/triggers.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class Once:
    """One-shot trigger that fires at a specific wall-clock time.

    .. warning::
        ``"HH:MM"`` string inputs are resolved against the **system process
        timezone** (``date_utils.now().tz``). Docker containers commonly run
        with ``TZ=UTC`` while the user's Home Assistant is configured for a
        local zone — a ``Once(at="07:00")`` written for local time will fire
        at 07:00 UTC in that environment with no warning. To avoid this: set
        ``TZ`` on the container to match the HA zone, or pass a
        ``ZonedDateTime`` directly with the intended timezone.

    Args:
        at: Target time. Accepts a ``"HH:MM"`` string (interpreted as today's
            wall-clock time in the **system process timezone** — see warning
            above) or a ``ZonedDateTime`` (absolute instant; timezone is
            explicit).
        if_past: Behavior when the computed fire time is in the past.
            - ``"tomorrow"`` (default): For ``"HH:MM"`` string inputs, defers to the next day.
              No effect for ``ZonedDateTime`` inputs (the absolute instant is used as-is;
              if it is in the past, the job fires immediately).
            - ``"error"``: Raises ``ValueError`` if the computed fire time is in the past.
              Applies to both ``"HH:MM"`` and ``ZonedDateTime`` inputs.

    Example:
        Once(at="07:00")                      # fires today at 07:00 (or tomorrow if past)
        Once(at="07:00", if_past="error")     # raises if 07:00 has already passed
    """

    def __init__(
        self,
        at: str | ZonedDateTime,
        if_past: Literal["tomorrow", "error"] = "tomorrow",
    ) -> None:
        self._if_past = if_past
        self._at_str: str | None = None

        if isinstance(at, str):
            self._at_str = at
            hour, minute = parse_hh_mm(at, "Once")
            now = date_utils.now()
            target = ZonedDateTime(now.year, now.month, now.day, hour, minute, tz=now.tz)
            if target <= now:
                if if_past == "error":
                    raise ValueError(f"Once(at={at!r}) constructed after the target time and if_past='error'")
                # Defer to tomorrow
                LOGGER.warning(
                    "Once(at=%r) constructed after the target time — deferring to tomorrow.",
                    at,
                )
                target = target.add(days=1)
            self._fire_at = target
        else:
            now = date_utils.now()
            if at <= now:
                if if_past == "error":
                    raise ValueError(f"Once(at=<ZonedDateTime {at.format_iso()!r}>) is in the past and if_past='error'")
                LOGGER.warning(
                    "Once received ZonedDateTime in the past; firing immediately — "
                    "if_past='tomorrow' cannot defer an absolute instant. (at=%r)",
                    at.format_iso(),
                )
            self._fire_at = at

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the scheduled fire time."""
        return self._fire_at

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> None:
        """One-shot trigger; always returns None."""
        return

    def trigger_label(self) -> str:
        return "once"

    def trigger_detail(self) -> str | None:
        if self._at_str is not None:
            return self._at_str
        return self._fire_at.format_iso()

    def trigger_db_type(self) -> Literal["once"]:
        return "once"

    def trigger_id(self) -> str:
        # Always include the full ISO timestamp so two Once jobs constructed on different days
        # (both at "07:00") do not share a trigger_id and do not shadow each other in the heap.
        return f"once:{self._fire_at.format_iso()}"

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return the scheduled fire time.

Source code in src/hassette/scheduler/triggers.py
145
146
147
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the scheduled fire time."""
    return self._fire_at

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> None

One-shot trigger; always returns None.

Source code in src/hassette/scheduler/triggers.py
149
150
151
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> None:
    """One-shot trigger; always returns None."""
    return