auto_context.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from __future__ import annotations
  2. import asyncio
  3. from typing import TYPE_CHECKING, Any, Coroutine, Generator, List
  4. from . import globals
  5. from .task_logger import create_task
  6. if TYPE_CHECKING:
  7. import justpy as jp
  8. def get_task_id() -> int:
  9. return id(asyncio.current_task()) if globals.loop and globals.loop.is_running() else 0
  10. def get_view_stack() -> List['jp.HTMLBaseComponent']:
  11. task_id = get_task_id()
  12. if task_id not in globals.view_stacks:
  13. globals.view_stacks[task_id] = []
  14. return globals.view_stacks[task_id]
  15. def prune_view_stack() -> None:
  16. task_id = get_task_id()
  17. if not globals.view_stacks[task_id]:
  18. del globals.view_stacks[task_id]
  19. class Context:
  20. def __init__(self, view: 'jp.HTMLBaseComponent') -> None:
  21. self.view = view
  22. def __enter__(self):
  23. self.child_count = len(self.view)
  24. get_view_stack().append(self.view)
  25. return self
  26. def __exit__(self, type, value, traceback):
  27. get_view_stack().pop()
  28. prune_view_stack()
  29. self.lazy_update()
  30. def lazy_update(self) -> None:
  31. if len(self.view) != self.child_count:
  32. self.child_count = len(self.view)
  33. create_task(self.view.update())
  34. def watch_asyncs(self, coro: Coroutine) -> AutoUpdaterForAsyncs:
  35. return AutoUpdaterForAsyncs(coro, self)
  36. class AutoUpdaterForAsyncs:
  37. def __init__(self, coro: Coroutine, context: Context) -> None:
  38. self.coro = coro
  39. self.context = context
  40. self.context.lazy_update()
  41. def __await__(self) -> Generator[Any, None, Any]:
  42. coro_iter = self.coro.__await__()
  43. iter_send, iter_throw = coro_iter.send, coro_iter.throw
  44. send, message = iter_send, None
  45. while True:
  46. try:
  47. signal = send(message)
  48. self.context.lazy_update()
  49. except StopIteration as err:
  50. return err.value
  51. else:
  52. send = iter_send
  53. try:
  54. message = yield signal
  55. except BaseException as err:
  56. send, message = iter_throw, err
  57. class ContextMixin:
  58. """
  59. Mixin providing a context manager for additional components.
  60. copied from nicegui.elements.group.Group
  61. """
  62. def __enter__(self):
  63. self._child_count_on_enter = len(self.view)
  64. get_view_stack().append(self.view)
  65. return self
  66. def __exit__(self, *_):
  67. get_view_stack().pop()
  68. if self._child_count_on_enter != len(self.view):
  69. self.update()