1
0

awaitable_response.py 1.1 KB

123456789101112131415161718192021222324252627282930313233
  1. from __future__ import annotations
  2. from typing import Callable
  3. from . import background_tasks
  4. class AwaitableResponse:
  5. def __init__(self, fire_and_forget: Callable, wait_for_result: Callable) -> None:
  6. """Awaitable Response
  7. This class can be used to run one of two different callables, depending on whether the response is awaited or not.
  8. :param fire_and_forget: The callable to run if the response is not awaited.
  9. :param wait_for_result: The callable to run if the response is awaited.
  10. """
  11. self.wait_for_result = wait_for_result
  12. self.fire_and_forget_task = background_tasks.create(self._start(fire_and_forget), name='fire and forget')
  13. async def _start(self, command: Callable) -> None:
  14. command()
  15. def __await__(self):
  16. self.fire_and_forget_task.cancel()
  17. return self.wait_for_result().__await__()
  18. @staticmethod
  19. def none() -> AwaitableResponse:
  20. """Return an AwaitableResponse that does nothing."""
  21. async def do_nothing():
  22. return None
  23. return AwaitableResponse(lambda: None, do_nothing)