Source code for aleph.toolkit.lifecycle

import asyncio
import logging
import signal
from typing import Awaitable, Callable

LOGGER = logging.getLogger(__name__)


[docs] def install_signal_handlers( loop: asyncio.AbstractEventLoop, callback: Callable[[], object] ) -> None: """Register SIGINT and SIGTERM handlers that invoke `callback`. Re-registering replaces the previous handler. Silently no-ops on platforms that do not support `loop.add_signal_handler` (Windows). """ for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, callback) except NotImplementedError: LOGGER.debug("Signal handler for %s not available on this platform", sig)
[docs] async def safe_async_cleanup(name: str, awaitable: Awaitable[object]) -> None: """Await a cleanup coroutine, logging but never re-raising on failure. Cleanup paths run during shutdown and must not mask the shutdown signal. """ try: await awaitable except Exception: LOGGER.exception("Error during cleanup of %s", name)