"""Anonymous telemetry for Reflex.""" from __future__ import annotations import asyncio import dataclasses import multiprocessing import platform import warnings from contextlib import suppress from datetime import datetime, timezone import httpx import psutil from reflex import constants from reflex.config import environment from reflex.utils import console from reflex.utils.prerequisites import ensure_reflex_installation_id, get_project_hash UTC = timezone.utc POSTHOG_API_URL: str = "https://app.posthog.com/capture/" def get_os() -> str: """Get the operating system. Returns: The operating system. """ return platform.system() def get_detailed_platform_str() -> str: """Get the detailed os/platform string. Returns: The platform string """ return platform.platform() def get_python_version() -> str: """Get the Python version. Returns: The Python version. """ # Remove the "+" from the version string in case user is using a pre-release version. return platform.python_version().rstrip("+") def get_reflex_version() -> str: """Get the Reflex version. Returns: The Reflex version. """ return constants.Reflex.VERSION def get_cpu_count() -> int: """Get the number of CPUs. Returns: The number of CPUs. """ return multiprocessing.cpu_count() def get_memory() -> int: """Get the total memory in MB. Returns: The total memory in MB. """ try: return psutil.virtual_memory().total >> 20 except ValueError: # needed to pass ubuntu test return 0 def _raise_on_missing_project_hash() -> bool: """Check if an error should be raised when project hash is missing. When running reflex with --backend-only, or doing database migration operations, there is no requirement for a .web directory, so the reflex.json file may not exist, and this should not be considered an error. Returns: False when compilation should be skipped (i.e. no .web directory is required). Otherwise return True. """ return not environment.REFLEX_SKIP_COMPILE.get() def _prepare_event(event: str, **kwargs) -> dict: """Prepare the event to be sent to the PostHog server. Args: event: The event name. kwargs: Additional data to send with the event. Returns: The event data. """ from reflex.utils.prerequisites import get_cpu_info installation_id = ensure_reflex_installation_id() project_hash = get_project_hash(raise_on_fail=_raise_on_missing_project_hash()) if installation_id is None or project_hash is None: console.debug( f"Could not get installation_id or project_hash: {installation_id}, {project_hash}" ) return {} stamp = datetime.now(UTC).isoformat() cpuinfo = get_cpu_info() additional_keys = ["template", "context", "detail", "user_uuid"] additional_fields = { key: value for key in additional_keys if (value := kwargs.get(key)) is not None } return { "api_key": "phc_JoMo0fOyi0GQAooY3UyO9k0hebGkMyFJrrCw1Gt5SGb", "event": event, "properties": { "distinct_id": installation_id, "distinct_app_id": project_hash, "user_os": get_os(), "user_os_detail": get_detailed_platform_str(), "reflex_version": get_reflex_version(), "python_version": get_python_version(), "cpu_count": get_cpu_count(), "memory": get_memory(), "cpu_info": dataclasses.asdict(cpuinfo) if cpuinfo else {}, **additional_fields, }, "timestamp": stamp, } def _send_event(event_data: dict) -> bool: try: httpx.post(POSTHOG_API_URL, json=event_data) except Exception: return False else: return True def _send(event: str, telemetry_enabled: bool | None, **kwargs): from reflex.config import get_config # Get the telemetry_enabled from the config if it is not specified. if telemetry_enabled is None: telemetry_enabled = get_config().telemetry_enabled # Return if telemetry is disabled. if not telemetry_enabled: return False with suppress(Exception): event_data = _prepare_event(event, **kwargs) if not event_data: return False return _send_event(event_data) def send(event: str, telemetry_enabled: bool | None = None, **kwargs): """Send anonymous telemetry for Reflex. Args: event: The event name. telemetry_enabled: Whether to send the telemetry (If None, get from config). kwargs: Additional data to send with the event. """ async def async_send(event: str, telemetry_enabled: bool | None, **kwargs): return _send(event, telemetry_enabled, **kwargs) try: # Within an event loop context, send the event asynchronously. asyncio.create_task(async_send(event, telemetry_enabled, **kwargs)) except RuntimeError: # If there is no event loop, send the event synchronously. warnings.filterwarnings("ignore", category=RuntimeWarning) _send(event, telemetry_enabled, **kwargs) def send_error(error: Exception, context: str): """Send an error event. Args: error: The error to send. context: The context of the error (e.g. "frontend" or "backend") Returns: Whether the telemetry was sent successfully. """ return send("error", detail=type(error).__name__, context=context)