helpers.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import asyncio
  2. import functools
  3. import inspect
  4. import socket
  5. import sys
  6. import threading
  7. import time
  8. import webbrowser
  9. from contextlib import nullcontext
  10. from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple, Union
  11. from starlette.middleware import Middleware
  12. from starlette.middleware.sessions import SessionMiddleware
  13. from nicegui.storage import RequestTrackingMiddleware
  14. from . import background_tasks, globals
  15. if TYPE_CHECKING:
  16. from .client import Client
  17. KWONLY_SLOTS = {'kw_only': True, 'slots': True} if sys.version_info >= (3, 10) else {}
  18. def is_coroutine(object: Any) -> bool:
  19. while isinstance(object, functools.partial):
  20. object = object.func
  21. return asyncio.iscoroutinefunction(object)
  22. def safe_invoke(func: Union[Callable[..., Any], Awaitable], client: Optional['Client'] = None) -> None:
  23. try:
  24. if isinstance(func, Awaitable):
  25. async def func_with_client():
  26. with client or nullcontext():
  27. await func
  28. background_tasks.create(func_with_client())
  29. else:
  30. with client or nullcontext():
  31. result = func(client) if len(inspect.signature(func).parameters) == 1 and client is not None else func()
  32. if isinstance(result, Awaitable):
  33. async def result_with_client():
  34. with client or nullcontext():
  35. await result
  36. background_tasks.create(result_with_client())
  37. except Exception as e:
  38. globals.handle_exception(e)
  39. def is_port_open(host: str, port: int) -> bool:
  40. """Check if the port is open by checking if a TCP connection can be established."""
  41. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  42. try:
  43. sock.connect((host, port))
  44. except (ConnectionRefusedError, TimeoutError):
  45. return False
  46. except Exception:
  47. return False
  48. else:
  49. return True
  50. finally:
  51. sock.close()
  52. def schedule_browser(host: str, port: int) -> Tuple[threading.Thread, threading.Event]:
  53. """Wait non-blockingly for the port to be open, then start a webbrowser.
  54. This function launches a thread in order to be non-blocking.
  55. This thread then uses `is_port_open` to check when the port opens.
  56. When connectivity is confirmed, the webbrowser is launched using `webbrowser.open`.
  57. The thread is created as a daemon thread, in order to not interfere with Ctrl+C.
  58. If you need to stop this thread, you can do this by setting the Event, that gets returned.
  59. The thread will stop with the next loop without opening the browser.
  60. :return: A tuple consisting of the actual thread object and an event for stopping the thread.
  61. """
  62. cancel = threading.Event()
  63. def in_thread(host: str, port: int) -> None:
  64. while not is_port_open(host, port):
  65. if cancel.is_set():
  66. return
  67. time.sleep(0.1)
  68. webbrowser.open(f'http://{host}:{port}/')
  69. host = host if host != '0.0.0.0' else '127.0.0.1'
  70. thread = threading.Thread(target=in_thread, args=(host, port), daemon=True)
  71. thread.start()
  72. return thread, cancel
  73. def set_storage_secret(storage_secret: Optional[str] = None) -> None:
  74. """Set storage_secret for ui.run() and run_with."""
  75. if any(m.cls == SessionMiddleware for m in globals.app.user_middleware):
  76. # NOTE not using "add_middleware" because it would be the wrong order
  77. globals.app.user_middleware.append(Middleware(RequestTrackingMiddleware))
  78. elif storage_secret is not None:
  79. globals.app.add_middleware(RequestTrackingMiddleware)
  80. globals.app.add_middleware(SessionMiddleware, secret_key=storage_secret)