registry.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. """Utilities for working with registries."""
  2. import httpx
  3. from reflex.config import environment
  4. from reflex.utils import console, net
  5. from reflex.utils.decorator import once
  6. def latency(registry: str) -> int:
  7. """Get the latency of a registry.
  8. Args:
  9. registry (str): The URL of the registry.
  10. Returns:
  11. int: The latency of the registry in microseconds.
  12. """
  13. try:
  14. time_to_respond = net.get(registry, timeout=2).elapsed.microseconds
  15. except httpx.HTTPError:
  16. console.info(f"Failed to connect to {registry}.")
  17. return 10_000_000
  18. else:
  19. console.debug(f"Latency of {registry}: {time_to_respond}")
  20. return time_to_respond
  21. def average_latency(registry: str, attempts: int = 3) -> int:
  22. """Get the average latency of a registry.
  23. Args:
  24. registry: The URL of the registry.
  25. attempts: The number of attempts to make. Defaults to 10.
  26. Returns:
  27. The average latency of the registry in microseconds.
  28. """
  29. registry_latency = sum(latency(registry) for _ in range(attempts)) // attempts
  30. console.debug(f"Average latency of {registry}: {registry_latency}")
  31. return registry_latency
  32. def _get_best_registry() -> str:
  33. """Get the best registry based on latency.
  34. Returns:
  35. The best registry.
  36. """
  37. console.debug("Getting best registry...")
  38. registries = [
  39. ("https://registry.npmjs.org", 1),
  40. ("https://registry.npmmirror.com", 2),
  41. ]
  42. best_registry = min(registries, key=lambda x: average_latency(x[0]) * x[1])[0]
  43. console.debug(f"Best registry: {best_registry}")
  44. return best_registry
  45. @once
  46. def get_npm_registry() -> str:
  47. """Get npm registry. If environment variable is set, use it first.
  48. Returns:
  49. The npm registry.
  50. """
  51. return environment.NPM_CONFIG_REGISTRY.get() or _get_best_registry()