redir.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. """Utilities to handle redirection to browser UI."""
  2. import time
  3. import uuid
  4. import webbrowser
  5. import httpx
  6. from .. import constants
  7. from . import console
  8. def open_browser_and_wait(
  9. target_url: str, poll_url: str, interval: int = 2
  10. ) -> httpx.Response:
  11. """Open a browser window to target_url and request poll_url until it returns successfully.
  12. Args:
  13. target_url: The URL to open in the browser.
  14. poll_url: The URL to poll for success.
  15. interval: The interval in seconds to wait between polling.
  16. Returns:
  17. The response from the poll_url.
  18. """
  19. if not webbrowser.open(target_url):
  20. console.warn(
  21. f"Unable to automatically open the browser. Please navigate to {target_url} in your browser."
  22. )
  23. console.info("[b]Complete the workflow in the browser to continue.[/b]")
  24. while True:
  25. try:
  26. response = httpx.get(poll_url, follow_redirects=True)
  27. if response.is_success:
  28. break
  29. except httpx.RequestError as err:
  30. console.info(f"Will retry after error occurred while polling: {err}.")
  31. time.sleep(interval)
  32. return response
  33. def reflex_build_redirect() -> str:
  34. """Open the browser window to reflex.build and wait for the user to select a generation.
  35. Returns:
  36. The selected generation hash.
  37. """
  38. token = str(uuid.uuid4())
  39. target_url = constants.Templates.REFLEX_BUILD_URL.format(reflex_init_token=token)
  40. poll_url = constants.Templates.REFLEX_BUILD_POLL_URL.format(reflex_init_token=token)
  41. response = open_browser_and_wait(target_url, poll_url)
  42. return response.json()["generation_hash"]