1
0

helpers.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import asyncio
  2. import functools
  3. import hashlib
  4. import os
  5. import socket
  6. import threading
  7. import time
  8. import webbrowser
  9. from pathlib import Path
  10. from typing import Any, Optional, Set, Tuple, Union
  11. from .logging import log
  12. _shown_warnings: Set[str] = set()
  13. def warn_once(message: str, *, stack_info: bool = False) -> None:
  14. """Print a warning message only once."""
  15. if message not in _shown_warnings:
  16. log.warning(message, stack_info=stack_info)
  17. _shown_warnings.add(message)
  18. def is_pytest() -> bool:
  19. """Check if the code is running in pytest."""
  20. return 'PYTEST_CURRENT_TEST' in os.environ
  21. def is_coroutine_function(obj: Any) -> bool:
  22. """Check if the object is a coroutine function.
  23. This function is needed because functools.partial is not a coroutine function, but its func attribute is.
  24. Note: It will return false for coroutine objects.
  25. """
  26. while isinstance(obj, functools.partial):
  27. obj = obj.func
  28. return asyncio.iscoroutinefunction(obj)
  29. def is_file(path: Optional[Union[str, Path]]) -> bool:
  30. """Check if the path is a file that exists."""
  31. if not path:
  32. return False
  33. if isinstance(path, str) and path.strip().startswith('data:'):
  34. return False # NOTE: avoid passing data URLs to Path
  35. try:
  36. return Path(path).is_file()
  37. except OSError:
  38. return False
  39. def hash_file_path(path: Path) -> str:
  40. """Hash the given path."""
  41. return hashlib.sha256(path.as_posix().encode()).hexdigest()[:32]
  42. def is_port_open(host: str, port: int) -> bool:
  43. """Check if the port is open by checking if a TCP connection can be established."""
  44. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  45. try:
  46. sock.connect((host, port))
  47. except (ConnectionRefusedError, TimeoutError):
  48. return False
  49. except Exception:
  50. return False
  51. else:
  52. return True
  53. finally:
  54. sock.close()
  55. def schedule_browser(host: str, port: int) -> Tuple[threading.Thread, threading.Event]:
  56. """Wait non-blockingly for the port to be open, then start a webbrowser.
  57. This function launches a thread in order to be non-blocking.
  58. This thread then uses `is_port_open` to check when the port opens.
  59. When connectivity is confirmed, the webbrowser is launched using `webbrowser.open`.
  60. The thread is created as a daemon thread, in order to not interfere with Ctrl+C.
  61. If you need to stop this thread, you can do this by setting the Event, that gets returned.
  62. The thread will stop with the next loop without opening the browser.
  63. :return: A tuple consisting of the actual thread object and an event for stopping the thread.
  64. """
  65. cancel = threading.Event()
  66. def in_thread(host: str, port: int) -> None:
  67. while not is_port_open(host, port):
  68. if cancel.is_set():
  69. return
  70. time.sleep(0.1)
  71. webbrowser.open(f'http://{host}:{port}/')
  72. host = host if host != '0.0.0.0' else '127.0.0.1'
  73. thread = threading.Thread(target=in_thread, args=(host, port), daemon=True)
  74. thread.start()
  75. return thread, cancel
  76. def kebab_to_camel_case(string: str) -> str:
  77. """Convert a kebab-case string to camelCase."""
  78. return ''.join(word.capitalize() if i else word for i, word in enumerate(string.split('-')))