"""The Reflex config.""" from __future__ import annotations import dataclasses import enum import importlib import inspect import os import sys import urllib.parse from pathlib import Path from typing import Any, Dict, List, Optional, Set from typing_extensions import Annotated, get_type_hints from reflex.utils.exceptions import ConfigError, EnvironmentVarValueError from reflex.utils.types import GenericType, is_union, value_inside_optional try: import pydantic.v1 as pydantic except ModuleNotFoundError: import pydantic from reflex_cli.constants.hosting import Hosting from reflex import constants from reflex.base import Base from reflex.utils import console class DBConfig(Base): """Database config.""" engine: str username: Optional[str] = "" password: Optional[str] = "" host: Optional[str] = "" port: Optional[int] = None database: str @classmethod def postgresql( cls, database: str, username: str, password: str | None = None, host: str | None = None, port: int | None = 5432, ) -> DBConfig: """Create an instance with postgresql engine. Args: database: Database name. username: Database username. password: Database password. host: Database host. port: Database port. Returns: DBConfig instance. """ return cls( engine="postgresql", username=username, password=password, host=host, port=port, database=database, ) @classmethod def postgresql_psycopg2( cls, database: str, username: str, password: str | None = None, host: str | None = None, port: int | None = 5432, ) -> DBConfig: """Create an instance with postgresql+psycopg2 engine. Args: database: Database name. username: Database username. password: Database password. host: Database host. port: Database port. Returns: DBConfig instance. """ return cls( engine="postgresql+psycopg2", username=username, password=password, host=host, port=port, database=database, ) @classmethod def sqlite( cls, database: str, ) -> DBConfig: """Create an instance with sqlite engine. Args: database: Database name. Returns: DBConfig instance. """ return cls( engine="sqlite", database=database, ) def get_url(self) -> str: """Get database URL. Returns: The database URL. """ host = ( f"{self.host}:{self.port}" if self.host and self.port else self.host or "" ) username = urllib.parse.quote_plus(self.username) if self.username else "" password = urllib.parse.quote_plus(self.password) if self.password else "" if username: path = f"{username}:{password}@{host}" if password else f"{username}@{host}" else: path = f"{host}" return f"{self.engine}://{path}/{self.database}" def get_default_value_for_field(field: dataclasses.Field) -> Any: """Get the default value for a field. Args: field: The field. Returns: The default value. Raises: ValueError: If no default value is found. """ if field.default != dataclasses.MISSING: return field.default elif field.default_factory != dataclasses.MISSING: return field.default_factory() else: raise ValueError( f"Missing value for environment variable {field.name} and no default value found" ) # TODO: Change all interpret_.* signatures to value: str, field: dataclasses.Field once we migrate rx.Config to dataclasses def interpret_boolean_env(value: str, field_name: str) -> bool: """Interpret a boolean environment variable value. Args: value: The environment variable value. field_name: The field name. Returns: The interpreted value. Raises: EnvironmentVarValueError: If the value is invalid. """ true_values = ["true", "1", "yes", "y"] false_values = ["false", "0", "no", "n"] if value.lower() in true_values: return True elif value.lower() in false_values: return False raise EnvironmentVarValueError(f"Invalid boolean value: {value} for {field_name}") def interpret_int_env(value: str, field_name: str) -> int: """Interpret an integer environment variable value. Args: value: The environment variable value. field_name: The field name. Returns: The interpreted value. Raises: EnvironmentVarValueError: If the value is invalid. """ try: return int(value) except ValueError as ve: raise EnvironmentVarValueError( f"Invalid integer value: {value} for {field_name}" ) from ve def interpret_existing_path_env(value: str, field_name: str) -> ExistingPath: """Interpret a path environment variable value as an existing path. Args: value: The environment variable value. field_name: The field name. Returns: The interpreted value. Raises: EnvironmentVarValueError: If the path does not exist. """ path = Path(value) if not path.exists(): raise EnvironmentVarValueError(f"Path does not exist: {path} for {field_name}") return path def interpret_path_env(value: str, field_name: str) -> Path: """Interpret a path environment variable value. Args: value: The environment variable value. field_name: The field name. Returns: The interpreted value. """ return Path(value) def interpret_enum_env(value: str, field_type: GenericType, field_name: str) -> Any: """Interpret an enum environment variable value. Args: value: The environment variable value. field_type: The field type. field_name: The field name. Returns: The interpreted value. Raises: EnvironmentVarValueError: If the value is invalid. """ try: return field_type(value) except ValueError as ve: raise EnvironmentVarValueError( f"Invalid enum value: {value} for {field_name}" ) from ve def interpret_env_var_value( value: str, field_type: GenericType, field_name: str ) -> Any: """Interpret an environment variable value based on the field type. Args: value: The environment variable value. field_type: The field type. field_name: The field name. Returns: The interpreted value. Raises: ValueError: If the value is invalid. """ field_type = value_inside_optional(field_type) if is_union(field_type): raise ValueError( f"Union types are not supported for environment variables: {field_name}." ) if field_type is bool: return interpret_boolean_env(value, field_name) elif field_type is str: return value elif field_type is int: return interpret_int_env(value, field_name) elif field_type is Path: return interpret_path_env(value, field_name) elif field_type is ExistingPath: return interpret_existing_path_env(value, field_name) elif inspect.isclass(field_type) and issubclass(field_type, enum.Enum): return interpret_enum_env(value, field_type, field_name) else: raise ValueError( f"Invalid type for environment variable {field_name}: {field_type}. This is probably an issue in Reflex." ) class PathExistsFlag: """Flag to indicate that a path must exist.""" ExistingPath = Annotated[Path, PathExistsFlag] @dataclasses.dataclass(init=False) class EnvironmentVariables: """Environment variables class to instantiate environment variables.""" # Whether to use npm over bun to install frontend packages. REFLEX_USE_NPM: bool = False # The npm registry to use. NPM_CONFIG_REGISTRY: Optional[str] = None # Whether to use Granian for the backend. Otherwise, use Uvicorn. REFLEX_USE_GRANIAN: bool = False # The username to use for authentication on python package repository. Username and password must both be provided. TWINE_USERNAME: Optional[str] = None # The password to use for authentication on python package repository. Username and password must both be provided. TWINE_PASSWORD: Optional[str] = None # Whether to use the system installed bun. If set to false, bun will be bundled with the app. REFLEX_USE_SYSTEM_BUN: bool = False # Whether to use the system installed node and npm. If set to false, node and npm will be bundled with the app. REFLEX_USE_SYSTEM_NODE: bool = False # The working directory for the next.js commands. REFLEX_WEB_WORKDIR: Path = Path(constants.Dirs.WEB) # Path to the alembic config file ALEMBIC_CONFIG: ExistingPath = Path(constants.ALEMBIC_CONFIG) # Disable SSL verification for HTTPX requests. SSL_NO_VERIFY: bool = False # The directory to store uploaded files. REFLEX_UPLOADED_FILES_DIR: Path = Path(constants.Dirs.UPLOADED_FILES) # Whether to use seperate processes to compile the frontend and how many. If not set, defaults to thread executor. REFLEX_COMPILE_PROCESSES: Optional[int] = None # Whether to use seperate threads to compile the frontend and how many. Defaults to `min(32, os.cpu_count() + 4)`. REFLEX_COMPILE_THREADS: Optional[int] = None # The directory to store reflex dependencies. REFLEX_DIR: Path = Path(constants.Reflex.DIR) # Whether to print the SQL queries if the log level is INFO or lower. SQLALCHEMY_ECHO: bool = False # Whether to ignore the redis config error. Some redis servers only allow out-of-band configuration. REFLEX_IGNORE_REDIS_CONFIG_ERROR: bool = False # Whether to skip purging the web directory in dev mode. REFLEX_PERSIST_WEB_DIR: bool = False # The reflex.build frontend host. REFLEX_BUILD_FRONTEND: str = constants.Templates.REFLEX_BUILD_FRONTEND # The reflex.build backend host. REFLEX_BUILD_BACKEND: str = constants.Templates.REFLEX_BUILD_BACKEND def __init__(self): """Initialize the environment variables.""" type_hints = get_type_hints(type(self)) for field in dataclasses.fields(self): raw_value = os.getenv(field.name, None) field.type = type_hints.get(field.name) or field.type value = ( interpret_env_var_value(raw_value, field.type, field.name) if raw_value is not None else get_default_value_for_field(field) ) setattr(self, field.name, value) environment = EnvironmentVariables() class Config(Base): """The config defines runtime settings for the app. By default, the config is defined in an `rxconfig.py` file in the root of the app. ```python # rxconfig.py import reflex as rx config = rx.Config( app_name="myapp", api_url="http://localhost:8000", ) ``` Every config value can be overridden by an environment variable with the same name in uppercase. For example, `db_url` can be overridden by setting the `DB_URL` environment variable. See the [configuration](https://reflex.dev/docs/getting-started/configuration/) docs for more info. """ class Config: """Pydantic config for the config.""" validate_assignment = True # The name of the app (should match the name of the app directory). app_name: str # The log level to use. loglevel: constants.LogLevel = constants.LogLevel.DEFAULT # The port to run the frontend on. NOTE: When running in dev mode, the next available port will be used if this is taken. frontend_port: int = constants.DefaultPorts.FRONTEND_PORT # The path to run the frontend on. For example, "/app" will run the frontend on http://localhost:3000/app frontend_path: str = "" # The port to run the backend on. NOTE: When running in dev mode, the next available port will be used if this is taken. backend_port: int = constants.DefaultPorts.BACKEND_PORT # The backend url the frontend will connect to. This must be updated if the backend is hosted elsewhere, or in production. api_url: str = f"http://localhost:{backend_port}" # The url the frontend will be hosted on. deploy_url: Optional[str] = f"http://localhost:{frontend_port}" # The url the backend will be hosted on. backend_host: str = "0.0.0.0" # The database url used by rx.Model. db_url: Optional[str] = "sqlite:///reflex.db" # The redis url redis_url: Optional[str] = None # Telemetry opt-in. telemetry_enabled: bool = True # The bun path bun_path: ExistingPath = constants.Bun.DEFAULT_PATH # List of origins that are allowed to connect to the backend API. cors_allowed_origins: List[str] = ["*"] # Tailwind config. tailwind: Optional[Dict[str, Any]] = {"plugins": ["@tailwindcss/typography"]} # Timeout when launching the gunicorn server. TODO(rename this to backend_timeout?) timeout: int = 120 # Whether to enable or disable nextJS gzip compression. next_compression: bool = True # Whether to use React strict mode in nextJS react_strict_mode: bool = True # Additional frontend packages to install. frontend_packages: List[str] = [] # The hosting service backend URL. cp_backend_url: str = Hosting.CP_BACKEND_URL # The hosting service frontend URL. cp_web_url: str = Hosting.CP_WEB_URL # The worker class used in production mode gunicorn_worker_class: str = "uvicorn.workers.UvicornH11Worker" # Number of gunicorn workers from user gunicorn_workers: Optional[int] = None # Number of requests before a worker is restarted gunicorn_max_requests: int = 100 # Variance limit for max requests; gunicorn only gunicorn_max_requests_jitter: int = 25 # Indicate which type of state manager to use state_manager_mode: constants.StateManagerMode = constants.StateManagerMode.DISK # Maximum expiration lock time for redis state manager redis_lock_expiration: int = constants.Expiration.LOCK # Token expiration time for redis state manager redis_token_expiration: int = constants.Expiration.TOKEN # Attributes that were explicitly set by the user. _non_default_attributes: Set[str] = pydantic.PrivateAttr(set()) # Path to file containing key-values pairs to override in the environment; Dotenv format. env_file: Optional[str] = None def __init__(self, *args, **kwargs): """Initialize the config values. Args: *args: The args to pass to the Pydantic init method. **kwargs: The kwargs to pass to the Pydantic init method. Raises: ConfigError: If some values in the config are invalid. """ super().__init__(*args, **kwargs) # Update the config from environment variables. env_kwargs = self.update_from_env() for key, env_value in env_kwargs.items(): setattr(self, key, env_value) # Update default URLs if ports were set kwargs.update(env_kwargs) self._non_default_attributes.update(kwargs) self._replace_defaults(**kwargs) if ( self.state_manager_mode == constants.StateManagerMode.REDIS and not self.redis_url ): raise ConfigError( "REDIS_URL is required when using the redis state manager." ) @property def module(self) -> str: """Get the module name of the app. Returns: The module name. """ return ".".join([self.app_name, self.app_name]) def update_from_env(self) -> dict[str, Any]: """Update the config values based on set environment variables. If there is a set env_file, it is loaded first. Returns: The updated config values. """ if self.env_file: try: from dotenv import load_dotenv # type: ignore # load env file if exists load_dotenv(self.env_file, override=True) except ImportError: console.error( """The `python-dotenv` package is required to load environment variables from a file. Run `pip install "python-dotenv>=1.0.1"`.""" ) updated_values = {} # Iterate over the fields. for key, field in self.__fields__.items(): # The env var name is the key in uppercase. env_var = os.environ.get(key.upper()) # If the env var is set, override the config value. if env_var is not None: if key.upper() != "DB_URL": console.info( f"Overriding config value {key} with env var {key.upper()}={env_var}", dedupe=True, ) # Interpret the value. value = interpret_env_var_value(env_var, field.outer_type_, field.name) # Set the value. updated_values[key] = value return updated_values def get_event_namespace(self) -> str: """Get the path that the backend Websocket server lists on. Returns: The namespace for websocket. """ event_url = constants.Endpoint.EVENT.get_url() return urllib.parse.urlsplit(event_url).path def _replace_defaults(self, **kwargs): """Replace formatted defaults when the caller provides updates. Args: **kwargs: The kwargs passed to the config or from the env. """ if "api_url" not in self._non_default_attributes and "backend_port" in kwargs: self.api_url = f"http://localhost:{kwargs['backend_port']}" if ( "deploy_url" not in self._non_default_attributes and "frontend_port" in kwargs ): self.deploy_url = f"http://localhost:{kwargs['frontend_port']}" if "api_url" not in self._non_default_attributes: # If running in Github Codespaces, override API_URL codespace_name = os.getenv("CODESPACE_NAME") GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN = os.getenv( "GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN" ) # If running on Replit.com interactively, override API_URL to ensure we maintain the backend_port replit_dev_domain = os.getenv("REPLIT_DEV_DOMAIN") backend_port = kwargs.get("backend_port", self.backend_port) if codespace_name and GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN: self.api_url = ( f"https://{codespace_name}-{kwargs.get('backend_port', self.backend_port)}" f".{GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN}" ) elif replit_dev_domain and backend_port: self.api_url = f"https://{replit_dev_domain}:{backend_port}" def _set_persistent(self, **kwargs): """Set values in this config and in the environment so they persist into subprocess. Args: **kwargs: The kwargs passed to the config. """ for key, value in kwargs.items(): if value is not None: os.environ[key.upper()] = str(value) setattr(self, key, value) self._non_default_attributes.update(kwargs) self._replace_defaults(**kwargs) def get_config(reload: bool = False) -> Config: """Get the app config. Args: reload: Re-import the rxconfig module from disk Returns: The app config. """ sys.path.insert(0, os.getcwd()) # only import the module if it exists. If a module spec exists then # the module exists. spec = importlib.util.find_spec(constants.Config.MODULE) # type: ignore if not spec: # we need this condition to ensure that a ModuleNotFound error is not thrown when # running unit/integration tests. return Config(app_name="") rxconfig = importlib.import_module(constants.Config.MODULE) if reload: importlib.reload(rxconfig) return rxconfig.config