registry.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. """Utilities for working with registries."""
  2. import httpx
  3. from reflex.utils import console
  4. def latency(registry: str) -> int:
  5. """Get the latency of a registry.
  6. Args:
  7. registry (str): The URL of the registry.
  8. Returns:
  9. int: The latency of the registry in microseconds.
  10. """
  11. try:
  12. return httpx.get(registry).elapsed.microseconds
  13. except httpx.HTTPError:
  14. console.info(f"Failed to connect to {registry}.")
  15. return 10_000_000
  16. def average_latency(registry, attempts: int = 3) -> int:
  17. """Get the average latency of a registry.
  18. Args:
  19. registry (str): The URL of the registry.
  20. attempts (int): The number of attempts to make. Defaults to 10.
  21. Returns:
  22. int: The average latency of the registry in microseconds.
  23. """
  24. return sum(latency(registry) for _ in range(attempts)) // attempts
  25. def _get_best_registry() -> str:
  26. """Get the best registry based on latency.
  27. Returns:
  28. str: The best registry.
  29. """
  30. registries = [
  31. "https://registry.npmjs.org",
  32. "https://r.cnpmjs.org",
  33. ]
  34. return min(registries, key=average_latency)