Source code for aleph.toolkit.batch

from typing import AsyncIterator, List, TypeVar

T = TypeVar("T")


[docs] async def async_batch( async_iterable: AsyncIterator[T], n: int ) -> AsyncIterator[List[T]]: batch = [] async for item in async_iterable: batch.append(item) if len(batch) == n: yield batch batch = [] # Yield the last batch if batch: yield batch