Source code for aleph.services.storage.engine

import abc
from typing import AsyncIterable, Optional


[docs] class StorageEngine(abc.ABC):
[docs] @abc.abstractmethod async def read(self, filename: str) -> Optional[bytes]: ...
[docs] @abc.abstractmethod async def read_iterator(
self, filename: str, chunk_size: int = 1024 * 1024 ) -> Optional[AsyncIterable[bytes]]: ...
[docs] @abc.abstractmethod async def write(self, filename: str, content: bytes): ...
[docs] @abc.abstractmethod async def delete(self, filename: str): ...
[docs] @abc.abstractmethod async def exists(self, filename: str) -> bool: ...