Skip to content

Task Bucket

TaskBucket

Bases: Resource

Track and clean up a set of tasks for a service/app.

Source code in src/hassette/task_bucket/task_bucket.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class TaskBucket(Resource):
    """Track and clean up a set of tasks for a service/app."""

    _tasks: "set[asyncio.Task[Any]]"

    _exception_recorders: "list[ExceptionRecorderT]"
    """List of recorders called for each non-CancelledError task exception."""

    def __init__(self, hassette: "Hassette", *, parent: "Resource | None" = None) -> None:
        super().__init__(hassette, parent=parent)
        self._tasks: set[asyncio.Task[Any]] = set()
        self._exception_recorders = []
        self.mark_ready(reason="TaskBucket initialized")

    @property
    def config_cancel_timeout(self) -> int | float:
        """Return the task cancellation timeout from the config."""
        return self.hassette.config.lifecycle.task_cancellation_timeout_seconds

    @property
    def config_log_level(self) -> LOG_LEVEL_TYPE:
        """Return the log level from the config."""
        return self.hassette.config.logging.task_bucket

    def __bool__(self) -> bool:
        # truthiness should not trigger __len__
        return True

    def add(self, task: asyncio.Task[Any]) -> None:
        """Add a task to the bucket and attach exception logging."""
        self._tasks.add(task)

        def _done(t: asyncio.Task[Any]) -> None:
            try:
                exc = t.exception()
            except asyncio.CancelledError:  # noqa: ASYNC103 — cancelled task is expected, not an error
                return  # noqa: ASYNC104
            except Exception:
                return
            if exc:
                self.logger.error("[%s] task %s crashed", self.unique_name, t.get_name(), exc_info=exc)
                for recorder in list(self._exception_recorders):
                    try:
                        recorder(t, exc)
                    except Exception:
                        self.logger.exception(
                            "[%s] exception recorder failed for task %s",
                            self.unique_name,
                            t.get_name(),
                        )

        task.add_done_callback(lambda t: self._tasks.discard(t))
        task.add_done_callback(_done)

    def install_exception_recorder(self, recorder: "ExceptionRecorderT") -> None:
        """Install a callback that is called for each non-CancelledError task exception.

        Called from the task's done callback, after the error is logged.
        The recorder receives the completed task and the exception.

        Intended for test infrastructure (e.g., AppTestHarness drain) that needs to
        collect task exceptions regardless of whether the task completed during a
        ``asyncio.wait`` call or between iterations.

        Multiple recorders may be installed; all are called in installation order
        (FIFO) when an exception occurs.

        Args:
            recorder: Callable ``(task, exc) -> None`` invoked on each non-cancellation
                exception.
        """
        self._exception_recorders.append(recorder)

    def uninstall_exception_recorder(self, recorder: "ExceptionRecorderT") -> None:
        """Remove a previously installed exception recorder.

        Safe to call even if the recorder was never installed — it is a no-op in
        that case. Removes the first occurrence; assumes each installed recorder
        is a distinct callable.

        Args:
            recorder: The recorder callable to remove.
        """
        with contextlib.suppress(ValueError):
            self._exception_recorders.remove(recorder)

    def spawn(self, coro: CoroLikeT[T], *, name: str | None = None) -> asyncio.Task[T]:
        """Convenience: create and track a new task."""
        if name is None:
            name = getattr(coro, "__qualname__", None) or repr(coro)
        self.logger.debug("Spawning task %s in bucket %s", name, self.unique_name)
        current_thread = threading.get_ident()

        if current_thread == self.hassette._loop_thread_id:
            # Fast path: already on loop thread
            with ctx.use_task_bucket(self):
                return asyncio.create_task(coro, name=name)
        else:
            # Dev-mode tracking: log cross-thread spawn
            if self.hassette.config.dev_mode:
                self.logger.debug(
                    "Cross-thread spawn: %s from thread %s (loop thread %s)",
                    name,
                    current_thread,
                    self.hassette._loop_thread_id,
                )
            # Cross-thread: create the task on the real loop thread and wait for the handle
            result: Future[asyncio.Task[T]] = Future()

            def _create() -> None:
                try:
                    with ctx.use_task_bucket(self):
                        task = asyncio.create_task(coro, name=name)
                    result.set_result(task)
                except Exception as e:
                    result.set_exception(e)

            self.hassette.loop.call_soon_threadsafe(_create)
            return result.result()  # block this worker thread briefly to hand back the Task

    def run_in_thread(self, fn: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> "CoroutineType[Any, Any, R]":
        """Run a synchronous function in a separate thread.

        This is a thin wrapper around `asyncio.to_thread`, but ensures that the current TaskBucket context
        is preserved in the new thread.

        Args:
            fn: The synchronous function to run.
            *args: Positional arguments to pass to the function.
            **kwargs: Keyword arguments to pass to the function.

        Returns:
            A coroutine that resolves to the return value of *fn*.
        """
        current_bucket = ctx.CURRENT_BUCKET.get()

        def _call() -> R:
            if current_bucket is not None:
                with ctx.use_task_bucket(current_bucket):
                    return fn(*args, **kwargs)
            else:
                return fn(*args, **kwargs)

        return asyncio.to_thread(_call)

    def post_to_loop(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        """Schedule a callable on the event loop from any thread."""
        self.hassette.loop.call_soon_threadsafe(fn, *args, **kwargs)

    @overload
    def make_async_adapter(self, fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: ...
    @overload
    def make_async_adapter(self, fn: Callable[P, R]) -> Callable[P, Awaitable[R]]: ...

    def make_async_adapter(self, fn: Callable[P, R] | Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        """
        Normalize a callable (sync or async) into an async callable with the same signature.

        - If `fn` is async: await it.
        - If `fn` is sync: run it in Hassette's thread pool executor via TaskBucket.run_in_thread.
        """
        if is_async_callable(fn):

            @functools.wraps(cast("Callable[..., object]", fn))
            async def _async_fn(*args: P.args, **kwargs: P.kwargs) -> R:
                return await cast("Callable[P, Awaitable[R]]", fn)(*args, **kwargs)

            return _async_fn

        @functools.wraps(cast("Callable[..., object]", fn))
        async def _sync_fn(*args: P.args, **kwargs: P.kwargs) -> R:
            try:
                return await self.run_in_thread(cast("Callable[P, R]", fn), *args, **kwargs)
            except TimeoutError:
                raise
            except Exception:
                # optional: you can re-raise without cancelling; no task to cancel anymore
                self.logger.exception("Error in sync function '%s'", getattr(fn, "__name__", repr(fn)))
                raise

        return _sync_fn

    def run_sync(self, fn: Coroutine[Any, Any, R], timeout_seconds: int | float | None = None) -> R:
        """Run an async function in a synchronous context.

        Args:
            fn: The async function to run.
            timeout_seconds: The timeout for the function call. None uses the config value.

        Returns:
            The result of the function call.
        """

        timeout_seconds = timeout_seconds or self.hassette.config.lifecycle.run_sync_timeout_seconds

        # If we're already in an event loop, don't allow blocking calls.
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            pass  # not in a loop -> safe to block
        else:
            fn.close()  # close the coroutine to avoid warnings
            raise RuntimeError("This sync method was called from within an event loop. Use the async method instead.")

        try:
            fut = asyncio.run_coroutine_threadsafe(fn, self.hassette.loop)
            return fut.result(timeout=timeout_seconds)
        except CfTimeoutError:
            self.logger.exception("Sync function '%s' timed out", fn.__name__)
            raise
        except Exception:
            self.logger.exception("Failed to run sync function '%s'", fn.__name__)
            raise
        finally:
            if not fut.done():
                fut.cancel()

    async def run_on_loop_thread(self, fn: typing.Callable[..., R], *args: Any, **kwargs: Any) -> R:
        """Run a synchronous function on the main event loop thread.

        This is useful for ensuring that loop-affine code runs in the correct context.
        """
        fut = self.hassette.loop.create_future()

        def _call():
            try:
                fut.set_result(fn(*args, **kwargs))
            except Exception as e:
                fut.set_exception(e)

        self.hassette.loop.call_soon_threadsafe(_call)
        return await fut

    def create_task_on_loop(self, coro: Coroutine[Any, Any, Any], *, name: str | None = None) -> asyncio.Task[Any]:
        """Create a task on the main event loop thread, in this bucket's context."""
        with ctx.use_task_bucket(self):
            return self.hassette.loop.create_task(coro, name=name)

    def pending_tasks(self) -> list[asyncio.Task[Any]]:
        """Return a snapshot list of non-completed tasks in this bucket.

        This is the recommended public accessor for drain helpers and test
        infrastructure. Returns a fresh list so callers can safely iterate after
        mutations to the internal set without risking a ``RuntimeError``.

        Returns:
            A list of tasks that are currently running (not yet done).
        """
        return [t for t in list(self._tasks) if not t.done()]

    def cancel_all_sync(self) -> None:
        """Cancel all tracked tasks without awaiting completion (fire-and-forget)."""
        current = asyncio.current_task()
        tasks = [t for t in list(self._tasks) if not t.done() and t is not current]
        for t in tasks:
            t.cancel()

    async def cancel_all(self) -> None:
        """Cancel all tracked tasks, wait for them to finish, and log stragglers."""
        # snapshot to avoid mutation during iteration
        current = asyncio.current_task()
        tasks = [t for t in list(self._tasks) if not t.done() and t is not current]

        if not tasks:
            self.logger.debug("No tasks to cancel in bucket %s", self.unique_name)
            return

        self.logger.debug("Cancelling %d tasks in bucket %s", len(tasks), self.unique_name)
        for t in tasks:
            t.cancel()

        done, pending = await asyncio.wait(tasks, timeout=self.config_cancel_timeout)
        self.logger.debug("%d tasks done, %d still pending in bucket %s", len(done), len(pending), self.unique_name)

        for t in done:
            if t.cancelled():
                continue
            exc = t.exception()
            if exc:
                self.logger.warning("[%s] task %s errored during shutdown: %r", self.unique_name, t.get_name(), exc)

        for t in pending:
            self.logger.warning(
                "[%s] task %s refused to die within %.1fs", self.unique_name, t.get_name(), self.config_cancel_timeout
            )

    def __len__(self) -> int:
        return len(self._tasks)

config_cancel_timeout: int | float property

Return the task cancellation timeout from the config.

config_log_level: LOG_LEVEL_TYPE property

Return the log level from the config.

add(task: asyncio.Task[Any]) -> None

Add a task to the bucket and attach exception logging.

Source code in src/hassette/task_bucket/task_bucket.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def add(self, task: asyncio.Task[Any]) -> None:
    """Add a task to the bucket and attach exception logging."""
    self._tasks.add(task)

    def _done(t: asyncio.Task[Any]) -> None:
        try:
            exc = t.exception()
        except asyncio.CancelledError:  # noqa: ASYNC103 — cancelled task is expected, not an error
            return  # noqa: ASYNC104
        except Exception:
            return
        if exc:
            self.logger.error("[%s] task %s crashed", self.unique_name, t.get_name(), exc_info=exc)
            for recorder in list(self._exception_recorders):
                try:
                    recorder(t, exc)
                except Exception:
                    self.logger.exception(
                        "[%s] exception recorder failed for task %s",
                        self.unique_name,
                        t.get_name(),
                    )

    task.add_done_callback(lambda t: self._tasks.discard(t))
    task.add_done_callback(_done)

install_exception_recorder(recorder: ExceptionRecorderT) -> None

Install a callback that is called for each non-CancelledError task exception.

Called from the task's done callback, after the error is logged. The recorder receives the completed task and the exception.

Intended for test infrastructure (e.g., AppTestHarness drain) that needs to collect task exceptions regardless of whether the task completed during a asyncio.wait call or between iterations.

Multiple recorders may be installed; all are called in installation order (FIFO) when an exception occurs.

Parameters:

Name Type Description Default
recorder ExceptionRecorderT

Callable (task, exc) -> None invoked on each non-cancellation exception.

required
Source code in src/hassette/task_bucket/task_bucket.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def install_exception_recorder(self, recorder: "ExceptionRecorderT") -> None:
    """Install a callback that is called for each non-CancelledError task exception.

    Called from the task's done callback, after the error is logged.
    The recorder receives the completed task and the exception.

    Intended for test infrastructure (e.g., AppTestHarness drain) that needs to
    collect task exceptions regardless of whether the task completed during a
    ``asyncio.wait`` call or between iterations.

    Multiple recorders may be installed; all are called in installation order
    (FIFO) when an exception occurs.

    Args:
        recorder: Callable ``(task, exc) -> None`` invoked on each non-cancellation
            exception.
    """
    self._exception_recorders.append(recorder)

uninstall_exception_recorder(recorder: ExceptionRecorderT) -> None

Remove a previously installed exception recorder.

Safe to call even if the recorder was never installed — it is a no-op in that case. Removes the first occurrence; assumes each installed recorder is a distinct callable.

Parameters:

Name Type Description Default
recorder ExceptionRecorderT

The recorder callable to remove.

required
Source code in src/hassette/task_bucket/task_bucket.py
106
107
108
109
110
111
112
113
114
115
116
117
def uninstall_exception_recorder(self, recorder: "ExceptionRecorderT") -> None:
    """Remove a previously installed exception recorder.

    Safe to call even if the recorder was never installed — it is a no-op in
    that case. Removes the first occurrence; assumes each installed recorder
    is a distinct callable.

    Args:
        recorder: The recorder callable to remove.
    """
    with contextlib.suppress(ValueError):
        self._exception_recorders.remove(recorder)

spawn(coro: CoroLikeT[T], *, name: str | None = None) -> asyncio.Task[T]

Convenience: create and track a new task.

Source code in src/hassette/task_bucket/task_bucket.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def spawn(self, coro: CoroLikeT[T], *, name: str | None = None) -> asyncio.Task[T]:
    """Convenience: create and track a new task."""
    if name is None:
        name = getattr(coro, "__qualname__", None) or repr(coro)
    self.logger.debug("Spawning task %s in bucket %s", name, self.unique_name)
    current_thread = threading.get_ident()

    if current_thread == self.hassette._loop_thread_id:
        # Fast path: already on loop thread
        with ctx.use_task_bucket(self):
            return asyncio.create_task(coro, name=name)
    else:
        # Dev-mode tracking: log cross-thread spawn
        if self.hassette.config.dev_mode:
            self.logger.debug(
                "Cross-thread spawn: %s from thread %s (loop thread %s)",
                name,
                current_thread,
                self.hassette._loop_thread_id,
            )
        # Cross-thread: create the task on the real loop thread and wait for the handle
        result: Future[asyncio.Task[T]] = Future()

        def _create() -> None:
            try:
                with ctx.use_task_bucket(self):
                    task = asyncio.create_task(coro, name=name)
                result.set_result(task)
            except Exception as e:
                result.set_exception(e)

        self.hassette.loop.call_soon_threadsafe(_create)
        return result.result()  # block this worker thread briefly to hand back the Task

run_in_thread(fn: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> CoroutineType[Any, Any, R]

Run a synchronous function in a separate thread.

This is a thin wrapper around asyncio.to_thread, but ensures that the current TaskBucket context is preserved in the new thread.

Parameters:

Name Type Description Default
fn Callable[P, R]

The synchronous function to run.

required
*args args

Positional arguments to pass to the function.

()
**kwargs kwargs

Keyword arguments to pass to the function.

{}

Returns:

Type Description
CoroutineType[Any, Any, R]

A coroutine that resolves to the return value of fn.

Source code in src/hassette/task_bucket/task_bucket.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def run_in_thread(self, fn: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> "CoroutineType[Any, Any, R]":
    """Run a synchronous function in a separate thread.

    This is a thin wrapper around `asyncio.to_thread`, but ensures that the current TaskBucket context
    is preserved in the new thread.

    Args:
        fn: The synchronous function to run.
        *args: Positional arguments to pass to the function.
        **kwargs: Keyword arguments to pass to the function.

    Returns:
        A coroutine that resolves to the return value of *fn*.
    """
    current_bucket = ctx.CURRENT_BUCKET.get()

    def _call() -> R:
        if current_bucket is not None:
            with ctx.use_task_bucket(current_bucket):
                return fn(*args, **kwargs)
        else:
            return fn(*args, **kwargs)

    return asyncio.to_thread(_call)

post_to_loop(fn: Callable[..., Any], *args: Any, **kwargs: Any) -> None

Schedule a callable on the event loop from any thread.

Source code in src/hassette/task_bucket/task_bucket.py
178
179
180
def post_to_loop(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
    """Schedule a callable on the event loop from any thread."""
    self.hassette.loop.call_soon_threadsafe(fn, *args, **kwargs)

make_async_adapter(fn: Callable[P, R] | Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]

make_async_adapter(
    fn: Callable[P, Awaitable[R]],
) -> Callable[P, Awaitable[R]]
make_async_adapter(
    fn: Callable[P, R],
) -> Callable[P, Awaitable[R]]

Normalize a callable (sync or async) into an async callable with the same signature.

  • If fn is async: await it.
  • If fn is sync: run it in Hassette's thread pool executor via TaskBucket.run_in_thread.
Source code in src/hassette/task_bucket/task_bucket.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def make_async_adapter(self, fn: Callable[P, R] | Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
    """
    Normalize a callable (sync or async) into an async callable with the same signature.

    - If `fn` is async: await it.
    - If `fn` is sync: run it in Hassette's thread pool executor via TaskBucket.run_in_thread.
    """
    if is_async_callable(fn):

        @functools.wraps(cast("Callable[..., object]", fn))
        async def _async_fn(*args: P.args, **kwargs: P.kwargs) -> R:
            return await cast("Callable[P, Awaitable[R]]", fn)(*args, **kwargs)

        return _async_fn

    @functools.wraps(cast("Callable[..., object]", fn))
    async def _sync_fn(*args: P.args, **kwargs: P.kwargs) -> R:
        try:
            return await self.run_in_thread(cast("Callable[P, R]", fn), *args, **kwargs)
        except TimeoutError:
            raise
        except Exception:
            # optional: you can re-raise without cancelling; no task to cancel anymore
            self.logger.exception("Error in sync function '%s'", getattr(fn, "__name__", repr(fn)))
            raise

    return _sync_fn

run_sync(fn: Coroutine[Any, Any, R], timeout_seconds: int | float | None = None) -> R

Run an async function in a synchronous context.

Parameters:

Name Type Description Default
fn Coroutine[Any, Any, R]

The async function to run.

required
timeout_seconds int | float | None

The timeout for the function call. None uses the config value.

None

Returns:

Type Description
R

The result of the function call.

Source code in src/hassette/task_bucket/task_bucket.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def run_sync(self, fn: Coroutine[Any, Any, R], timeout_seconds: int | float | None = None) -> R:
    """Run an async function in a synchronous context.

    Args:
        fn: The async function to run.
        timeout_seconds: The timeout for the function call. None uses the config value.

    Returns:
        The result of the function call.
    """

    timeout_seconds = timeout_seconds or self.hassette.config.lifecycle.run_sync_timeout_seconds

    # If we're already in an event loop, don't allow blocking calls.
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        pass  # not in a loop -> safe to block
    else:
        fn.close()  # close the coroutine to avoid warnings
        raise RuntimeError("This sync method was called from within an event loop. Use the async method instead.")

    try:
        fut = asyncio.run_coroutine_threadsafe(fn, self.hassette.loop)
        return fut.result(timeout=timeout_seconds)
    except CfTimeoutError:
        self.logger.exception("Sync function '%s' timed out", fn.__name__)
        raise
    except Exception:
        self.logger.exception("Failed to run sync function '%s'", fn.__name__)
        raise
    finally:
        if not fut.done():
            fut.cancel()

run_on_loop_thread(fn: typing.Callable[..., R], *args: Any, **kwargs: Any) -> R async

Run a synchronous function on the main event loop thread.

This is useful for ensuring that loop-affine code runs in the correct context.

Source code in src/hassette/task_bucket/task_bucket.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
async def run_on_loop_thread(self, fn: typing.Callable[..., R], *args: Any, **kwargs: Any) -> R:
    """Run a synchronous function on the main event loop thread.

    This is useful for ensuring that loop-affine code runs in the correct context.
    """
    fut = self.hassette.loop.create_future()

    def _call():
        try:
            fut.set_result(fn(*args, **kwargs))
        except Exception as e:
            fut.set_exception(e)

    self.hassette.loop.call_soon_threadsafe(_call)
    return await fut

create_task_on_loop(coro: Coroutine[Any, Any, Any], *, name: str | None = None) -> asyncio.Task[Any]

Create a task on the main event loop thread, in this bucket's context.

Source code in src/hassette/task_bucket/task_bucket.py
266
267
268
269
def create_task_on_loop(self, coro: Coroutine[Any, Any, Any], *, name: str | None = None) -> asyncio.Task[Any]:
    """Create a task on the main event loop thread, in this bucket's context."""
    with ctx.use_task_bucket(self):
        return self.hassette.loop.create_task(coro, name=name)

pending_tasks() -> list[asyncio.Task[Any]]

Return a snapshot list of non-completed tasks in this bucket.

This is the recommended public accessor for drain helpers and test infrastructure. Returns a fresh list so callers can safely iterate after mutations to the internal set without risking a RuntimeError.

Returns:

Type Description
list[Task[Any]]

A list of tasks that are currently running (not yet done).

Source code in src/hassette/task_bucket/task_bucket.py
271
272
273
274
275
276
277
278
279
280
281
def pending_tasks(self) -> list[asyncio.Task[Any]]:
    """Return a snapshot list of non-completed tasks in this bucket.

    This is the recommended public accessor for drain helpers and test
    infrastructure. Returns a fresh list so callers can safely iterate after
    mutations to the internal set without risking a ``RuntimeError``.

    Returns:
        A list of tasks that are currently running (not yet done).
    """
    return [t for t in list(self._tasks) if not t.done()]

cancel_all_sync() -> None

Cancel all tracked tasks without awaiting completion (fire-and-forget).

Source code in src/hassette/task_bucket/task_bucket.py
283
284
285
286
287
288
def cancel_all_sync(self) -> None:
    """Cancel all tracked tasks without awaiting completion (fire-and-forget)."""
    current = asyncio.current_task()
    tasks = [t for t in list(self._tasks) if not t.done() and t is not current]
    for t in tasks:
        t.cancel()

cancel_all() -> None async

Cancel all tracked tasks, wait for them to finish, and log stragglers.

Source code in src/hassette/task_bucket/task_bucket.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def cancel_all(self) -> None:
    """Cancel all tracked tasks, wait for them to finish, and log stragglers."""
    # snapshot to avoid mutation during iteration
    current = asyncio.current_task()
    tasks = [t for t in list(self._tasks) if not t.done() and t is not current]

    if not tasks:
        self.logger.debug("No tasks to cancel in bucket %s", self.unique_name)
        return

    self.logger.debug("Cancelling %d tasks in bucket %s", len(tasks), self.unique_name)
    for t in tasks:
        t.cancel()

    done, pending = await asyncio.wait(tasks, timeout=self.config_cancel_timeout)
    self.logger.debug("%d tasks done, %d still pending in bucket %s", len(done), len(pending), self.unique_name)

    for t in done:
        if t.cancelled():
            continue
        exc = t.exception()
        if exc:
            self.logger.warning("[%s] task %s errored during shutdown: %r", self.unique_name, t.get_name(), exc)

    for t in pending:
        self.logger.warning(
            "[%s] task %s refused to die within %.1fs", self.unique_name, t.get_name(), self.config_cancel_timeout
        )