background_tasks.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. '''inspired from https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/'''
  2. import asyncio
  3. import sys
  4. from typing import Awaitable, Dict, Set, TypeVar
  5. from . import globals
  6. T = TypeVar('T')
  7. name_supported = sys.version_info[1] >= 8
  8. running_tasks: Set[asyncio.Task] = set()
  9. lazy_tasks_running: Dict[str, asyncio.Task] = {}
  10. lazy_tasks_waiting: Dict[str, Awaitable[T]] = {}
  11. def create(coroutine: Awaitable[T], *, name: str = 'unnamed task') -> 'asyncio.Task[T]':
  12. '''Wraps a loop.create_task call and ensures there is an exception handler added to the task.
  13. If the task raises an exception, it is logged and handled by the global exception handlers.
  14. Also a reference to the task is kept until it is done, so that the task is not garbage collected mid-execution.
  15. See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task.
  16. '''
  17. task = globals.loop.create_task(coroutine, name=name) if name_supported else globals.loop.create_task(coroutine)
  18. task.add_done_callback(_handle_task_result)
  19. running_tasks.add(task)
  20. task.add_done_callback(running_tasks.discard)
  21. return task
  22. def create_lazy(coroutine: Awaitable[T], *, name: str) -> 'asyncio.Task[T]':
  23. '''Wraps a create call and ensures a second task with the same name is delayed until the first one is done.
  24. If a third task with the same name is created while the first one is still running, the second one is discarded.
  25. '''
  26. if name in lazy_tasks_running:
  27. lazy_tasks_waiting[name] = coroutine
  28. return
  29. def finalize(name: str) -> None:
  30. lazy_tasks_running.pop(name)
  31. if name in lazy_tasks_waiting:
  32. create_lazy(lazy_tasks_waiting.pop(name), name=name)
  33. task = create(coroutine, name=name)
  34. lazy_tasks_running[name] = task
  35. task.add_done_callback(lambda _: finalize(name))
  36. def _handle_task_result(task: asyncio.Task) -> None:
  37. try:
  38. task.result()
  39. except asyncio.CancelledError:
  40. pass
  41. except Exception as e:
  42. globals.handle_exception(e)