asyncio.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from __future__ import annotations
  2. import asyncio
  3. from asyncio import AbstractEventLoop, runners
  4. from typing import Optional, Callable, AsyncIterator, Iterator
  5. from ..errors import NestAsyncioError
  6. try:
  7. import nest_asyncio
  8. has_nest_asyncio = True
  9. except ImportError:
  10. has_nest_asyncio = False
  11. try:
  12. import uvloop
  13. has_uvloop = True
  14. except ImportError:
  15. has_uvloop = False
  16. def get_running_loop(check_nested: bool) -> Optional[AbstractEventLoop]:
  17. try:
  18. loop = asyncio.get_running_loop()
  19. # Do not patch uvloop loop because its incompatible.
  20. if has_uvloop:
  21. if isinstance(loop, uvloop.Loop):
  22. return loop
  23. if not hasattr(loop.__class__, "_nest_patched"):
  24. if has_nest_asyncio:
  25. nest_asyncio.apply(loop)
  26. elif check_nested:
  27. raise NestAsyncioError('Install "nest_asyncio" package | pip install -U nest_asyncio')
  28. return loop
  29. except RuntimeError:
  30. pass
  31. # Fix for RuntimeError: async generator ignored GeneratorExit
  32. async def await_callback(callback: Callable):
  33. return await callback()
  34. async def async_generator_to_list(generator: AsyncIterator) -> list:
  35. return [item async for item in generator]
  36. def to_sync_generator(generator: AsyncIterator, stream: bool = True) -> Iterator:
  37. if not stream:
  38. yield from asyncio.run(async_generator_to_list(generator))
  39. return
  40. loop = get_running_loop(check_nested=False)
  41. new_loop = False
  42. if loop is None:
  43. loop = asyncio.new_event_loop()
  44. asyncio.set_event_loop(loop)
  45. new_loop = True
  46. gen = generator.__aiter__()
  47. try:
  48. while True:
  49. yield loop.run_until_complete(await_callback(gen.__anext__))
  50. except StopAsyncIteration:
  51. pass
  52. finally:
  53. if new_loop:
  54. try:
  55. runners._cancel_all_tasks(loop)
  56. loop.run_until_complete(loop.shutdown_asyncgens())
  57. if hasattr(loop, "shutdown_default_executor"):
  58. loop.run_until_complete(loop.shutdown_default_executor())
  59. finally:
  60. asyncio.set_event_loop(None)
  61. loop.close()
  62. # Helper function to convert a synchronous iterator to an async iterator
  63. async def to_async_iterator(iterator: Iterator) -> AsyncIterator:
  64. try:
  65. async for item in iterator:
  66. yield item
  67. except TypeError:
  68. yield await iterator