registry.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """Utilities for working with registries."""
  2. import httpx
  3. from reflex.config import environment
  4. from reflex.utils import console, net
  5. def latency(registry: str) -> int:
  6. """Get the latency of a registry.
  7. Args:
  8. registry (str): The URL of the registry.
  9. Returns:
  10. int: The latency of the registry in microseconds.
  11. """
  12. try:
  13. return net.get(registry).elapsed.microseconds
  14. except httpx.HTTPError:
  15. console.info(f"Failed to connect to {registry}.")
  16. return 10_000_000
  17. def average_latency(registry: str, attempts: int = 3) -> int:
  18. """Get the average latency of a registry.
  19. Args:
  20. registry: The URL of the registry.
  21. attempts: The number of attempts to make. Defaults to 10.
  22. Returns:
  23. The average latency of the registry in microseconds.
  24. """
  25. return sum(latency(registry) for _ in range(attempts)) // attempts
  26. def get_best_registry() -> str:
  27. """Get the best registry based on latency.
  28. Returns:
  29. str: The best registry.
  30. """
  31. registries = [
  32. "https://registry.npmjs.org",
  33. "https://r.cnpmjs.org",
  34. ]
  35. return min(registries, key=average_latency)
  36. def _get_npm_registry() -> str:
  37. """Get npm registry. If environment variable is set, use it first.
  38. Returns:
  39. str:
  40. """
  41. return environment.NPM_CONFIG_REGISTRY.get() or get_best_registry()