Source code for aleph.services.storage.engine
import abc
from typing import AsyncIterable, Optional
[docs]
class StorageEngine(abc.ABC):
[docs]
@abc.abstractmethod
async def read(self, filename: str) -> Optional[bytes]: ...
[docs]
@abc.abstractmethod
async def read_iterator(
self, filename: str, chunk_size: int = 1024 * 1024
) -> Optional[AsyncIterable[bytes]]: ...
[docs]
@abc.abstractmethod
async def write(self, filename: str, content: bytes): ...
[docs]
@abc.abstractmethod
async def delete(self, filename: str): ...
[docs]
@abc.abstractmethod
async def exists(self, filename: str) -> bool: ...