background_tasks.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. """inspired from https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/"""
  2. from __future__ import annotations
  3. import asyncio
  4. from typing import Awaitable, Dict, Set
  5. from . import core
  6. running_tasks: Set[asyncio.Task] = set()
  7. lazy_tasks_running: Dict[str, asyncio.Task] = {}
  8. lazy_tasks_waiting: Dict[str, Awaitable] = {}
  9. def create(coroutine: Awaitable, *, name: str = 'unnamed task') -> asyncio.Task:
  10. """Wraps a loop.create_task call and ensures there is an exception handler added to the task.
  11. If the task raises an exception, it is logged and handled by the global exception handlers.
  12. Also a reference to the task is kept until it is done, so that the task is not garbage collected mid-execution.
  13. See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task.
  14. """
  15. assert core.loop is not None
  16. assert asyncio.iscoroutine(coroutine)
  17. task: asyncio.Task = core.loop.create_task(coroutine, name=name)
  18. task.add_done_callback(_handle_task_result)
  19. running_tasks.add(task)
  20. task.add_done_callback(running_tasks.discard)
  21. return task
  22. def create_lazy(coroutine: Awaitable, *, name: str) -> None:
  23. """Wraps a create call and ensures a second task with the same name is delayed until the first one is done.
  24. If a third task with the same name is created while the first one is still running, the second one is discarded.
  25. """
  26. if name in lazy_tasks_running:
  27. if name in lazy_tasks_waiting:
  28. asyncio.Task(lazy_tasks_waiting[name]).cancel()
  29. lazy_tasks_waiting[name] = coroutine
  30. return
  31. def finalize(name: str) -> None:
  32. lazy_tasks_running.pop(name)
  33. if name in lazy_tasks_waiting:
  34. create_lazy(lazy_tasks_waiting.pop(name), name=name)
  35. task = create(coroutine, name=name)
  36. lazy_tasks_running[name] = task
  37. task.add_done_callback(lambda _: finalize(name))
  38. def _handle_task_result(task: asyncio.Task) -> None:
  39. try:
  40. task.result()
  41. except asyncio.CancelledError:
  42. pass
  43. except Exception as e:
  44. core.app.handle_exception(e)