123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import asyncio
- import functools
- import hashlib
- import os
- import socket
- import threading
- import time
- import webbrowser
- from pathlib import Path
- from typing import Any, Optional, Set, Tuple, Union
- from .logging import log
- _shown_warnings: Set[str] = set()
- def warn_once(message: str, *, stack_info: bool = False) -> None:
- """Print a warning message only once."""
- if message not in _shown_warnings:
- log.warning(message, stack_info=stack_info)
- _shown_warnings.add(message)
- def is_pytest() -> bool:
- """Check if the code is running in pytest."""
- return 'PYTEST_CURRENT_TEST' in os.environ
- def is_coroutine_function(obj: Any) -> bool:
- """Check if the object is a coroutine function.
- This function is needed because functools.partial is not a coroutine function, but its func attribute is.
- Note: It will return false for coroutine objects.
- """
- while isinstance(obj, functools.partial):
- obj = obj.func
- return asyncio.iscoroutinefunction(obj)
- def is_file(path: Optional[Union[str, Path]]) -> bool:
- """Check if the path is a file that exists."""
- if not path:
- return False
- if isinstance(path, str) and path.strip().startswith('data:'):
- return False # NOTE: avoid passing data URLs to Path
- try:
- return Path(path).is_file()
- except OSError:
- return False
- def hash_file_path(path: Path) -> str:
- """Hash the given path."""
- return hashlib.sha256(path.as_posix().encode()).hexdigest()[:32]
- def is_port_open(host: str, port: int) -> bool:
- """Check if the port is open by checking if a TCP connection can be established."""
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- sock.connect((host, port))
- except (ConnectionRefusedError, TimeoutError):
- return False
- except Exception:
- return False
- else:
- return True
- finally:
- sock.close()
- def schedule_browser(host: str, port: int) -> Tuple[threading.Thread, threading.Event]:
- """Wait non-blockingly for the port to be open, then start a webbrowser.
- This function launches a thread in order to be non-blocking.
- This thread then uses `is_port_open` to check when the port opens.
- When connectivity is confirmed, the webbrowser is launched using `webbrowser.open`.
- The thread is created as a daemon thread, in order to not interfere with Ctrl+C.
- If you need to stop this thread, you can do this by setting the Event, that gets returned.
- The thread will stop with the next loop without opening the browser.
- :return: A tuple consisting of the actual thread object and an event for stopping the thread.
- """
- cancel = threading.Event()
- def in_thread(host: str, port: int) -> None:
- while not is_port_open(host, port):
- if cancel.is_set():
- return
- time.sleep(0.1)
- webbrowser.open(f'http://{host}:{port}/')
- host = host if host != '0.0.0.0' else '127.0.0.1'
- thread = threading.Thread(target=in_thread, args=(host, port), daemon=True)
- thread.start()
- return thread, cancel
- def kebab_to_camel_case(string: str) -> str:
- """Convert a kebab-case string to camelCase."""
- return ''.join(word.capitalize() if i else word for i, word in enumerate(string.split('-')))
|