Coverage for src/ss_utils_async/__init__.py: 100%

10 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-25 10:12 +0800

1import asyncio 

2from collections.abc import Coroutine 

3from collections.abc import Iterable 

4from typing import Any 

5 

6 

7async def gather_w_semaphore(coroutines: Iterable[Coroutine], max_coroutines: int = 4) -> list[Any]: 

8 """ 

9 Gathers coroutines with a semaphore to limit concurrent execution, allowing arguments to be passed to coroutines. 

10 

11 Args: 

12 coroutines: An interable of coroutines to run concurrently (NOTE: i.e. `async_fn(arg, **kwargs)`, NOT the `async_fn` itself). 

13 max_coroutines: The maximum number of coroutines to run concurrently. 

14 

15 Returns: 

16 List of results. 

17 """ 

18 semaphore = asyncio.Semaphore(max_coroutines) 

19 

20 async def _wrap_coroutine(coroutine: Coroutine): 

21 async with semaphore: 

22 return await coroutine 

23 

24 return await asyncio.gather(*[_wrap_coroutine(coroutine) for coroutine in coroutines])