1
0

background_tasks.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """inspired from https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/"""
  2. from __future__ import annotations
  3. import asyncio
  4. import sys
  5. from typing import Awaitable, Dict, Set, TypeVar
  6. from . import globals # pylint: disable=redefined-builtin,cyclic-import
  7. T = TypeVar('T')
  8. name_supported = sys.version_info[1] >= 8
  9. running_tasks: Set[asyncio.Task] = set()
  10. lazy_tasks_running: Dict[str, asyncio.Task] = {}
  11. lazy_tasks_waiting: Dict[str, Awaitable[T]] = {}
  12. def create(coroutine: Awaitable[T], *, name: str = 'unnamed task') -> asyncio.Task[T]:
  13. """Wraps a loop.create_task call and ensures there is an exception handler added to the task.
  14. If the task raises an exception, it is logged and handled by the global exception handlers.
  15. Also a reference to the task is kept until it is done, so that the task is not garbage collected mid-execution.
  16. See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task.
  17. """
  18. assert globals.loop is not None
  19. task: asyncio.Task = \
  20. globals.loop.create_task(coroutine, name=name) if name_supported else globals.loop.create_task(coroutine)
  21. task.add_done_callback(_handle_task_result)
  22. running_tasks.add(task)
  23. task.add_done_callback(running_tasks.discard)
  24. return task
  25. def create_lazy(coroutine: Awaitable[T], *, name: str) -> None:
  26. """Wraps a create call and ensures a second task with the same name is delayed until the first one is done.
  27. If a third task with the same name is created while the first one is still running, the second one is discarded.
  28. """
  29. if name in lazy_tasks_running:
  30. if name in lazy_tasks_waiting:
  31. asyncio.Task(lazy_tasks_waiting[name]).cancel()
  32. lazy_tasks_waiting[name] = coroutine
  33. return
  34. def finalize(name: str) -> None:
  35. lazy_tasks_running.pop(name)
  36. if name in lazy_tasks_waiting:
  37. create_lazy(lazy_tasks_waiting.pop(name), name=name)
  38. task = create(coroutine, name=name)
  39. lazy_tasks_running[name] = task
  40. task.add_done_callback(lambda _: finalize(name))
  41. def _handle_task_result(task: asyncio.Task) -> None:
  42. try:
  43. task.result()
  44. except asyncio.CancelledError:
  45. pass
  46. except Exception as e:
  47. globals.handle_exception(e)