Coverage for src/ss_utils_async/__init__.py: 100%
10 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-25 10:12 +0800
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-25 10:12 +0800
1import asyncio
2from collections.abc import Coroutine
3from collections.abc import Iterable
4from typing import Any
7async def gather_w_semaphore(coroutines: Iterable[Coroutine], max_coroutines: int = 4) -> list[Any]:
8 """
9 Gathers coroutines with a semaphore to limit concurrent execution, allowing arguments to be passed to coroutines.
11 Args:
12 coroutines: An interable of coroutines to run concurrently (NOTE: i.e. `async_fn(arg, **kwargs)`, NOT the `async_fn` itself).
13 max_coroutines: The maximum number of coroutines to run concurrently.
15 Returns:
16 List of results.
17 """
18 semaphore = asyncio.Semaphore(max_coroutines)
20 async def _wrap_coroutine(coroutine: Coroutine):
21 async with semaphore:
22 return await coroutine
24 return await asyncio.gather(*[_wrap_coroutine(coroutine) for coroutine in coroutines])