"""Define a state var."""
from __future__ import annotations
import contextlib
import dataclasses
import dis
import inspect
import json
import random
import re
import string
import sys
from types import CodeType, FunctionType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Set,
Tuple,
Type,
Union,
_GenericAlias, # type: ignore
cast,
get_args,
get_origin,
get_type_hints,
)
import pydantic
from reflex import constants
from reflex.base import Base
from reflex.utils import console, format, imports, serializers, types
# This module used to export ImportVar itself, so we still import it for export here
from reflex.utils.imports import ImportDict, ImportVar
if TYPE_CHECKING:
from reflex.state import BaseState
# Set of unique variable names.
USED_VARIABLES = set()
# Supported operators for all types.
ALL_OPS = ["==", "!=", "!==", "===", "&&", "||"]
# Delimiters used between function args or operands.
DELIMITERS = [","]
# Mapping of valid operations for different type combinations.
OPERATION_MAPPING = {
(int, int): {
"+",
"-",
"/",
"//",
"*",
"%",
"**",
">",
"<",
"<=",
">=",
"|",
"&",
},
(int, str): {"*"},
(int, list): {"*"},
(str, str): {"+", ">", "<", "<=", ">="},
(float, float): {"+", "-", "/", "//", "*", "%", "**", ">", "<", "<=", ">="},
(float, int): {"+", "-", "/", "//", "*", "%", "**", ">", "<", "<=", ">="},
(list, list): {"+", ">", "<", "<=", ">="},
}
# These names were changed in reflex 0.3.0
REPLACED_NAMES = {
"full_name": "_var_full_name",
"name": "_var_name",
"state": "_var_data.state",
"type_": "_var_type",
"is_local": "_var_is_local",
"is_string": "_var_is_string",
"set_state": "_var_set_state",
"deps": "_deps",
}
def get_unique_variable_name() -> str:
"""Get a unique variable name.
Returns:
The unique variable name.
"""
name = "".join([random.choice(string.ascii_lowercase) for _ in range(8)])
if name not in USED_VARIABLES:
USED_VARIABLES.add(name)
return name
return get_unique_variable_name()
class VarData(Base):
"""Metadata associated with a Var."""
# The name of the enclosing state.
state: str = ""
# Imports needed to render this var
imports: ImportDict = {}
# Hooks that need to be present in the component to render this var
hooks: Set[str] = set()
@classmethod
def merge(cls, *others: VarData | None) -> VarData | None:
"""Merge multiple var data objects.
Args:
*others: The var data objects to merge.
Returns:
The merged var data object.
"""
state = ""
_imports = {}
hooks = set()
for var_data in others:
if var_data is None:
continue
state = state or var_data.state
_imports = imports.merge_imports(_imports, var_data.imports)
hooks.update(var_data.hooks)
return (
cls(
state=state,
imports=_imports,
hooks=hooks,
)
or None
)
def __bool__(self) -> bool:
"""Check if the var data is non-empty.
Returns:
True if any field is set to a non-default value.
"""
return bool(self.state or self.imports or self.hooks)
def __eq__(self, other: Any) -> bool:
"""Check if two var data objects are equal.
Args:
other: The other var data object to compare.
Returns:
True if all fields are equal and collapsed imports are equal.
"""
if not isinstance(other, VarData):
return False
return (
self.state == other.state
and self.hooks == other.hooks
and imports.collapse_imports(self.imports)
== imports.collapse_imports(other.imports)
)
def __dict(self) -> dict:
"""Convert the var data to a dictionary.
Returns:
The var data dictionary.
"""
return {
"state": self.state,
"imports": {
lib: [import_var.dict() for import_var in import_vars]
for lib, import_vars in self.imports.items()
},
"hooks": list(self.hooks),
}
def _encode_var(value: Var) -> str:
"""Encode the state name into a formatted var.
Args:
value: The value to encode the state name into.
Returns:
The encoded var.
"""
if value._var_data:
return f"{value._var_data.json()}" + str(value)
return str(value)
def _decode_var(value: str) -> tuple[VarData | None, str]:
"""Decode the state name from a formatted var.
Args:
value: The value to extract the state name from.
Returns:
The extracted state name and the value without the state name.
"""
var_datas = []
if isinstance(value, str):
# Extract the state name from a formatted var
while m := re.match(r"(.*)(.*)(.*)", value):
value = m.group(1) + m.group(3)
try:
var_datas.append(VarData.parse_raw(m.group(2)))
except pydantic.ValidationError:
# If the VarData is invalid, it was probably json-encoded twice...
var_datas.append(VarData.parse_raw(json.loads(f'"{m.group(2)}"')))
if var_datas:
return VarData.merge(*var_datas), value
return None, value
def _extract_var_data(value: Iterable) -> list[VarData | None]:
"""Extract the var imports and hooks from an iterable containing a Var.
Args:
value: The iterable to extract the VarData from
Returns:
The extracted VarDatas.
"""
var_datas = []
with contextlib.suppress(TypeError):
for sub in value:
if isinstance(sub, Var):
var_datas.append(sub._var_data)
elif not isinstance(sub, str):
# Recurse into dict values.
if hasattr(sub, "values") and callable(sub.values):
var_datas.extend(_extract_var_data(sub.values()))
# Recurse into iterable values (or dict keys).
var_datas.extend(_extract_var_data(sub))
# Recurse when value is a dict itself.
values = getattr(value, "values", None)
if callable(values):
var_datas.extend(_extract_var_data(values()))
return var_datas
class Var:
"""An abstract var."""
# The name of the var.
_var_name: str
# The type of the var.
_var_type: Type
# Whether this is a local javascript variable.
_var_is_local: bool
# Whether the var is a string literal.
_var_is_string: bool
# _var_full_name should be prefixed with _var_state
_var_full_name_needs_state_prefix: bool
# Extra metadata associated with the Var
_var_data: Optional[VarData]
@classmethod
def create(
cls, value: Any, _var_is_local: bool = True, _var_is_string: bool = False
) -> Var | None:
"""Create a var from a value.
Args:
value: The value to create the var from.
_var_is_local: Whether the var is local.
_var_is_string: Whether the var is a string literal.
Returns:
The var.
Raises:
TypeError: If the value is JSON-unserializable.
"""
# Check for none values.
if value is None:
return None
# If the value is already a var, do nothing.
if isinstance(value, Var):
return value
# Try to pull the imports and hooks from contained values.
_var_data = None
if not isinstance(value, str):
_var_data = VarData.merge(*_extract_var_data(value))
# Try to serialize the value.
type_ = type(value)
name = value if type_ in types.JSONType else serializers.serialize(value)
if name is None:
raise TypeError(
f"No JSON serializer found for var {value} of type {type_}."
)
name = name if isinstance(name, str) else format.json_dumps(name)
return BaseVar(
_var_name=name,
_var_type=type_,
_var_is_local=_var_is_local,
_var_is_string=_var_is_string,
_var_data=_var_data,
)
@classmethod
def create_safe(
cls, value: Any, _var_is_local: bool = True, _var_is_string: bool = False
) -> Var:
"""Create a var from a value, asserting that it is not None.
Args:
value: The value to create the var from.
_var_is_local: Whether the var is local.
_var_is_string: Whether the var is a string literal.
Returns:
The var.
"""
var = cls.create(
value,
_var_is_local=_var_is_local,
_var_is_string=_var_is_string,
)
assert var is not None
return var
@classmethod
def __class_getitem__(cls, type_: str) -> _GenericAlias:
"""Get a typed var.
Args:
type_: The type of the var.
Returns:
The var class item.
"""
return _GenericAlias(cls, type_)
def __post_init__(self) -> None:
"""Post-initialize the var."""
# Decode any inline Var markup and apply it to the instance
_var_data, _var_name = _decode_var(self._var_name)
if _var_data:
self._var_name = _var_name
self._var_data = VarData.merge(self._var_data, _var_data)
def _replace(self, merge_var_data=None, **kwargs: Any) -> Var:
"""Make a copy of this Var with updated fields.
Args:
merge_var_data: VarData to merge into the existing VarData.
**kwargs: Var fields to update.
Returns:
A new BaseVar with the updated fields overwriting the corresponding fields in this Var.
"""
field_values = dict(
_var_name=kwargs.pop("_var_name", self._var_name),
_var_type=kwargs.pop("_var_type", self._var_type),
_var_is_local=kwargs.pop("_var_is_local", self._var_is_local),
_var_is_string=kwargs.pop("_var_is_string", self._var_is_string),
_var_full_name_needs_state_prefix=kwargs.pop(
"_var_full_name_needs_state_prefix",
self._var_full_name_needs_state_prefix,
),
_var_data=VarData.merge(
kwargs.get("_var_data", self._var_data), merge_var_data
),
)
return BaseVar(**field_values)
def _decode(self) -> Any:
"""Decode Var as a python value.
Note that Var with state set cannot be decoded python-side and will be
returned as full_name.
Returns:
The decoded value or the Var name.
"""
if self._var_is_string:
return self._var_name
try:
return json.loads(self._var_name)
except ValueError:
return self._var_name
def equals(self, other: Var) -> bool:
"""Check if two vars are equal.
Args:
other: The other var to compare.
Returns:
Whether the vars are equal.
"""
return (
self._var_name == other._var_name
and self._var_type == other._var_type
and self._var_is_local == other._var_is_local
and self._var_full_name_needs_state_prefix
== other._var_full_name_needs_state_prefix
and self._var_data == other._var_data
)
def to_string(self, json: bool = True) -> Var:
"""Convert a var to a string.
Args:
json: Whether to convert to a JSON string.
Returns:
The stringified var.
"""
fn = "JSON.stringify" if json else "String"
return self.operation(fn=fn, type_=str)
def __hash__(self) -> int:
"""Define a hash function for a var.
Returns:
The hash of the var.
"""
return hash((self._var_name, str(self._var_type)))
def __str__(self) -> str:
"""Wrap the var so it can be used in templates.
Returns:
The wrapped var, i.e. {state.var}.
"""
out = (
self._var_full_name
if self._var_is_local
else format.wrap(self._var_full_name, "{")
)
if self._var_is_string:
out = format.format_string(out)
return out
def __bool__(self) -> bool:
"""Raise exception if using Var in a boolean context.
Raises:
TypeError: when attempting to bool-ify the Var.
"""
# pydantic v2 hacks
if inspect.stack()[1].function in ("smart_deepcopy"):
return bool(self._decode())
raise TypeError(
f"Cannot convert Var {self._var_full_name!r} to bool for use with `if`, `and`, `or`, and `not`. "
"Instead use `rx.cond` and bitwise operators `&` (and), `|` (or), `~` (invert)."
)
def __iter__(self) -> Any:
"""Raise exception if using Var in an iterable context.
Raises:
TypeError: when attempting to iterate over the Var.
"""
raise TypeError(
f"Cannot iterate over Var {self._var_full_name!r}. Instead use `rx.foreach`."
)
def __format__(self, format_spec: str) -> str:
"""Format the var into a Javascript equivalent to an f-string.
Args:
format_spec: The format specifier (Ignored for now).
Returns:
The formatted var.
"""
# Encode the _var_data into the formatted output for tracking purposes.
str_self = _encode_var(self)
if self._var_is_local:
return str_self
return f"${str_self}"
def __getitem__(self, i: Any) -> Var:
"""Index into a var.
Args:
i: The index to index into.
Returns:
The indexed var.
Raises:
TypeError: If the var is not indexable.
"""
# Indexing is only supported for strings, lists, tuples, dicts, and dataframes.
if not (
types._issubclass(self._var_type, Union[List, Dict, Tuple, str])
or types.is_dataframe(self._var_type)
):
if self._var_type == Any:
raise TypeError(
"Could not index into var of type Any. (If you are trying to index into a state var, "
"add the correct type annotation to the var.)"
)
raise TypeError(
f"Var {self._var_name} of type {self._var_type} does not support indexing."
)
# The type of the indexed var.
type_ = Any
# Convert any vars to local vars.
if isinstance(i, Var):
i = i._replace(_var_is_local=True)
# Handle list/tuple/str indexing.
if types._issubclass(self._var_type, Union[List, Tuple, str]):
# List/Tuple/String indices must be ints, slices, or vars.
if (
not isinstance(i, types.get_args(Union[int, slice, Var]))
or isinstance(i, Var)
and not i._var_type == int
):
raise TypeError("Index must be an integer or an integer var.")
# Handle slices first.
if isinstance(i, slice):
# Get the start and stop indices.
start = i.start or 0
stop = i.stop or "undefined"
# Use the slice function.
return self._replace(
_var_name=f"{self._var_name}.slice({start}, {stop})",
_var_is_string=False,
)
# Get the type of the indexed var.
type_ = (
types.get_args(self._var_type)[0]
if types.is_generic_alias(self._var_type)
else Any
)
# Use `at` to support negative indices.
return self._replace(
_var_name=f"{self._var_name}.at({i})",
_var_type=type_,
_var_is_string=False,
)
# Dictionary / dataframe indexing.
# Tuples are currently not supported as indexes.
if (
(
types._issubclass(self._var_type, Dict)
or types.is_dataframe(self._var_type)
)
and not isinstance(i, types.get_args(Union[int, str, float, Var]))
) or (
isinstance(i, Var)
and not types._issubclass(
i._var_type, types.get_args(Union[int, str, float])
)
):
raise TypeError(
"Index must be one of the following types: int, str, int or str Var"
)
# Get the type of the indexed var.
if isinstance(i, str):
i = format.wrap(i, '"')
type_ = (
types.get_args(self._var_type)[1]
if types.is_generic_alias(self._var_type)
else Any
)
# Use normal indexing here.
return self._replace(
_var_name=f"{self._var_name}[{i}]",
_var_type=type_,
_var_is_string=False,
)
def __getattr__(self, name: str) -> Var:
"""Get a var attribute.
Args:
name: The name of the attribute.
Returns:
The var attribute.
Raises:
AttributeError: If the var is wrongly annotated or can't find attribute.
TypeError: If an annotation to the var isn't provided.
"""
# Check if the attribute is one of the class fields.
if not name.startswith("_"):
if self._var_type == Any:
raise TypeError(
f"You must provide an annotation for the state var `{self._var_full_name}`. Annotation cannot be `{self._var_type}`"
) from None
is_optional = types.is_optional(self._var_type)
type_ = types.get_attribute_access_type(self._var_type, name)
if type_ is not None:
return self._replace(
_var_name=f"{self._var_name}{'?' if is_optional else ''}.{name}",
_var_type=type_,
_var_is_string=False,
)
if name in REPLACED_NAMES:
raise AttributeError(
f"Field {name!r} was renamed to {REPLACED_NAMES[name]!r}"
)
raise AttributeError(
f"The State var `{self._var_full_name}` has no attribute '{name}' or may have been annotated "
f"wrongly."
)
if name.startswith("_var"):
print(name)
raise AttributeError(
f"The State var has no attribute '{name}' or may have been annotated wrongly.",
)
def operation(
self,
op: str = "",
other: Var | None = None,
type_: Type | None = None,
flip: bool = False,
fn: str | None = None,
invoke_fn: bool = False,
) -> Var:
"""Perform an operation on a var.
Args:
op: The operation to perform.
other: The other var to perform the operation on.
type_: The type of the operation result.
flip: Whether to flip the order of the operation.
fn: A function to apply to the operation.
invoke_fn: Whether to invoke the function.
Returns:
The operation result.
Raises:
TypeError: If the operation between two operands is invalid.
ValueError: If flip is set to true and value of operand is not provided
"""
if isinstance(other, str):
other = Var.create(json.dumps(other))
else:
other = Var.create(other)
type_ = type_ or self._var_type
if other is None and flip:
raise ValueError(
"flip_operands cannot be set to True if the value of 'other' operand is not provided"
)
left_operand, right_operand = (other, self) if flip else (self, other)
if other is not None:
# check if the operation between operands is valid.
if op and not self.is_valid_operation(
types.get_base_class(left_operand._var_type), # type: ignore
types.get_base_class(right_operand._var_type), # type: ignore
op,
):
raise TypeError(
f"Unsupported Operand type(s) for {op}: `{left_operand._var_full_name}` of type {left_operand._var_type.__name__} and `{right_operand._var_full_name}` of type {right_operand._var_type.__name__}" # type: ignore
)
# apply function to operands
if fn is not None:
if invoke_fn:
# invoke the function on left operand.
operation_name = f"{left_operand._var_full_name}.{fn}({right_operand._var_full_name})" # type: ignore
else:
# pass the operands as arguments to the function.
operation_name = f"{left_operand._var_full_name} {op} {right_operand._var_full_name}" # type: ignore
operation_name = f"{fn}({operation_name})"
else:
# apply operator to operands (left operand right_operand)
operation_name = f"{left_operand._var_full_name} {op} {right_operand._var_full_name}" # type: ignore
operation_name = format.wrap(operation_name, "(")
else:
# apply operator to left operand ( left_operand)
operation_name = f"{op}{self._var_full_name}"
# apply function to operands
if fn is not None:
operation_name = (
f"{fn}({operation_name})"
if not invoke_fn
else f"{self._var_full_name}.{fn}()"
)
return self._replace(
_var_name=operation_name,
_var_type=type_,
_var_is_string=False,
_var_full_name_needs_state_prefix=False,
merge_var_data=other._var_data if other is not None else None,
)
@staticmethod
def is_valid_operation(
operand1_type: Type, operand2_type: Type, operator: str
) -> bool:
"""Check if an operation between two operands is valid.
Args:
operand1_type: Type of the operand
operand2_type: Type of the second operand
operator: The operator.
Returns:
Whether operation is valid or not
"""
if operator in ALL_OPS or operator in DELIMITERS:
return True
# bools are subclasses of ints
pair = tuple(
sorted(
[
int if operand1_type == bool else operand1_type,
int if operand2_type == bool else operand2_type,
],
key=lambda x: x.__name__,
)
)
return pair in OPERATION_MAPPING and operator in OPERATION_MAPPING[pair]
def compare(self, op: str, other: Var) -> Var:
"""Compare two vars with inequalities.
Args:
op: The comparison operator.
other: The other var to compare with.
Returns:
The comparison result.
"""
return self.operation(op, other, bool)
def __invert__(self) -> Var:
"""Invert a var.
Returns:
The inverted var.
"""
return self.operation("!", type_=bool)
def __neg__(self) -> Var:
"""Negate a var.
Returns:
The negated var.
"""
return self.operation(fn="-")
def __abs__(self) -> Var:
"""Get the absolute value of a var.
Returns:
A var with the absolute value.
"""
return self.operation(fn="Math.abs")
def length(self) -> Var:
"""Get the length of a list var.
Returns:
A var with the absolute value.
Raises:
TypeError: If the var is not a list.
"""
if not types._issubclass(self._var_type, List):
raise TypeError(f"Cannot get length of non-list var {self}.")
return self._replace(
_var_name=f"{self._var_name}.length",
_var_type=int,
_var_is_string=False,
)
def __eq__(self, other: Var) -> Var:
"""Perform an equality comparison.
Args:
other: The other var to compare with.
Returns:
A var representing the equality comparison.
"""
return self.compare("===", other)
def __ne__(self, other: Var) -> Var:
"""Perform an inequality comparison.
Args:
other: The other var to compare with.
Returns:
A var representing the inequality comparison.
"""
return self.compare("!==", other)
def __gt__(self, other: Var) -> Var:
"""Perform a greater than comparison.
Args:
other: The other var to compare with.
Returns:
A var representing the greater than comparison.
"""
return self.compare(">", other)
def __ge__(self, other: Var) -> Var:
"""Perform a greater than or equal to comparison.
Args:
other: The other var to compare with.
Returns:
A var representing the greater than or equal to comparison.
"""
return self.compare(">=", other)
def __lt__(self, other: Var) -> Var:
"""Perform a less than comparison.
Args:
other: The other var to compare with.
Returns:
A var representing the less than comparison.
"""
return self.compare("<", other)
def __le__(self, other: Var) -> Var:
"""Perform a less than or equal to comparison.
Args:
other: The other var to compare with.
Returns:
A var representing the less than or equal to comparison.
"""
return self.compare("<=", other)
def __add__(self, other: Var, flip=False) -> Var:
"""Add two vars.
Args:
other: The other var to add.
flip: Whether to flip operands.
Returns:
A var representing the sum.
"""
other_type = other._var_type if isinstance(other, Var) else type(other)
# For list-list addition, javascript concatenates the content of the lists instead of
# merging the list, and for that reason we use the spread operator available through spreadArraysOrObjects
# utility function
if (
types.get_base_class(self._var_type) == list
and types.get_base_class(other_type) == list
):
return self.operation(
",", other, fn="spreadArraysOrObjects", flip=flip
)._replace(
merge_var_data=VarData(
imports={
f"/{constants.Dirs.STATE_PATH}": [
ImportVar(tag="spreadArraysOrObjects")
]
},
),
)
return self.operation("+", other, flip=flip)
def __radd__(self, other: Var) -> Var:
"""Add two vars.
Args:
other: The other var to add.
Returns:
A var representing the sum.
"""
return self.__add__(other=other, flip=True)
def __sub__(self, other: Var) -> Var:
"""Subtract two vars.
Args:
other: The other var to subtract.
Returns:
A var representing the difference.
"""
return self.operation("-", other)
def __rsub__(self, other: Var) -> Var:
"""Subtract two vars.
Args:
other: The other var to subtract.
Returns:
A var representing the difference.
"""
return self.operation("-", other, flip=True)
def __mul__(self, other: Var, flip=True) -> Var:
"""Multiply two vars.
Args:
other: The other var to multiply.
flip: Whether to flip operands
Returns:
A var representing the product.
"""
other_type = other._var_type if isinstance(other, Var) else type(other)
# For str-int multiplication, we use the repeat function.
# i.e "hello" * 2 is equivalent to "hello".repeat(2) in js.
if (types.get_base_class(self._var_type), types.get_base_class(other_type)) in [
(int, str),
(str, int),
]:
return self.operation(other=other, fn="repeat", invoke_fn=True)
# For list-int multiplication, we use the Array function.
# i.e ["hello"] * 2 is equivalent to Array(2).fill().map(() => ["hello"]).flat() in js.
if (types.get_base_class(self._var_type), types.get_base_class(other_type)) in [
(int, list),
(list, int),
]:
other_name = other._var_full_name if isinstance(other, Var) else other
name = f"Array({other_name}).fill().map(() => {self._var_full_name}).flat()"
return self._replace(
_var_name=name,
_var_type=str,
_var_is_string=False,
_var_full_name_needs_state_prefix=False,
)
return self.operation("*", other)
def __rmul__(self, other: Var) -> Var:
"""Multiply two vars.
Args:
other: The other var to multiply.
Returns:
A var representing the product.
"""
return self.__mul__(other=other, flip=True)
def __pow__(self, other: Var) -> Var:
"""Raise a var to a power.
Args:
other: The power to raise to.
Returns:
A var representing the power.
"""
return self.operation(",", other, fn="Math.pow")
def __rpow__(self, other: Var) -> Var:
"""Raise a var to a power.
Args:
other: The power to raise to.
Returns:
A var representing the power.
"""
return self.operation(",", other, flip=True, fn="Math.pow")
def __truediv__(self, other: Var) -> Var:
"""Divide two vars.
Args:
other: The other var to divide.
Returns:
A var representing the quotient.
"""
return self.operation("/", other)
def __rtruediv__(self, other: Var) -> Var:
"""Divide two vars.
Args:
other: The other var to divide.
Returns:
A var representing the quotient.
"""
return self.operation("/", other, flip=True)
def __floordiv__(self, other: Var) -> Var:
"""Divide two vars.
Args:
other: The other var to divide.
Returns:
A var representing the quotient.
"""
return self.operation("/", other, fn="Math.floor")
def __mod__(self, other: Var) -> Var:
"""Get the remainder of two vars.
Args:
other: The other var to divide.
Returns:
A var representing the remainder.
"""
return self.operation("%", other)
def __rmod__(self, other: Var) -> Var:
"""Get the remainder of two vars.
Args:
other: The other var to divide.
Returns:
A var representing the remainder.
"""
return self.operation("%", other, flip=True)
def __and__(self, other: Var) -> Var:
"""Perform a logical and.
Args:
other: The other var to perform the logical AND with.
Returns:
A var representing the logical AND.
Note:
This method provides behavior specific to JavaScript, where it returns the JavaScript
equivalent code (using the '&&' operator) of a logical AND operation.
In JavaScript, the
logical OR operator '&&' is used for Boolean logic, and this method emulates that behavior
by returning the equivalent code as a Var instance.
In Python, logical AND 'and' operates differently, evaluating expressions immediately, making
it challenging to override the behavior entirely.
Therefore, this method leverages the
bitwise AND '__and__' operator for custom JavaScript-like behavior.
Example:
>>> var1 = Var.create(True)
>>> var2 = Var.create(False)
>>> js_code = var1 & var2
>>> print(js_code._var_full_name)
'(true && false)'
"""
return self.operation("&&", other, type_=bool)
def __rand__(self, other: Var) -> Var:
"""Perform a logical and.
Args:
other: The other var to perform the logical AND with.
Returns:
A var representing the logical AND.
Note:
This method provides behavior specific to JavaScript, where it returns the JavaScript
equivalent code (using the '&&' operator) of a logical AND operation.
In JavaScript, the
logical OR operator '&&' is used for Boolean logic, and this method emulates that behavior
by returning the equivalent code as a Var instance.
In Python, logical AND 'and' operates differently, evaluating expressions immediately, making
it challenging to override the behavior entirely.
Therefore, this method leverages the
bitwise AND '__rand__' operator for custom JavaScript-like behavior.
Example:
>>> var1 = Var.create(True)
>>> var2 = Var.create(False)
>>> js_code = var1 & var2
>>> print(js_code._var_full_name)
'(false && true)'
"""
return self.operation("&&", other, type_=bool, flip=True)
def __or__(self, other: Var) -> Var:
"""Perform a logical or.
Args:
other: The other var to perform the logical or with.
Returns:
A var representing the logical or.
Note:
This method provides behavior specific to JavaScript, where it returns the JavaScript
equivalent code (using the '||' operator) of a logical OR operation. In JavaScript, the
logical OR operator '||' is used for Boolean logic, and this method emulates that behavior
by returning the equivalent code as a Var instance.
In Python, logical OR 'or' operates differently, evaluating expressions immediately, making
it challenging to override the behavior entirely. Therefore, this method leverages the
bitwise OR '__or__' operator for custom JavaScript-like behavior.
Example:
>>> var1 = Var.create(True)
>>> var2 = Var.create(False)
>>> js_code = var1 | var2
>>> print(js_code._var_full_name)
'(true || false)'
"""
return self.operation("||", other, type_=bool)
def __ror__(self, other: Var) -> Var:
"""Perform a logical or.
Args:
other: The other var to perform the logical or with.
Returns:
A var representing the logical or.
Note:
This method provides behavior specific to JavaScript, where it returns the JavaScript
equivalent code (using the '||' operator) of a logical OR operation. In JavaScript, the
logical OR operator '||' is used for Boolean logic, and this method emulates that behavior
by returning the equivalent code as a Var instance.
In Python, logical OR 'or' operates differently, evaluating expressions immediately, making
it challenging to override the behavior entirely. Therefore, this method leverages the
bitwise OR '__or__' operator for custom JavaScript-like behavior.
Example:
>>> var1 = Var.create(True)
>>> var2 = Var.create(False)
>>> js_code = var1 | var2
>>> print(js_code)
'false || true'
"""
return self.operation("||", other, type_=bool, flip=True)
def __contains__(self, _: Any) -> Var:
"""Override the 'in' operator to alert the user that it is not supported.
Raises:
TypeError: the operation is not supported
"""
raise TypeError(
"'in' operator not supported for Var types, use Var.contains() instead."
)
def contains(self, other: Any) -> Var:
"""Check if a var contains the object `other`.
Args:
other: The object to check.
Raises:
TypeError: If the var is not a valid type: dict, list, tuple or str.
Returns:
A var representing the contain check.
"""
if not (types._issubclass(self._var_type, Union[dict, list, tuple, str])):
raise TypeError(
f"Var {self._var_full_name} of type {self._var_type} does not support contains check."
)
method = (
"hasOwnProperty"
if types.get_base_class(self._var_type) == dict
else "includes"
)
if isinstance(other, str):
other = Var.create(json.dumps(other), _var_is_string=True)
elif not isinstance(other, Var):
other = Var.create(other)
if types._issubclass(self._var_type, Dict):
return self._replace(
_var_name=f"{self._var_name}.{method}({other._var_full_name})",
_var_type=bool,
_var_is_string=False,
merge_var_data=other._var_data,
)
else: # str, list, tuple
# For strings, the left operand must be a string.
if types._issubclass(self._var_type, str) and not types._issubclass(
other._var_type, str
):
raise TypeError(
f"'in ' requires string as left operand, not {other._var_type}"
)
return self._replace(
_var_name=f"{self._var_name}.includes({other._var_full_name})",
_var_type=bool,
_var_is_string=False,
merge_var_data=other._var_data,
)
def reverse(self) -> Var:
"""Reverse a list var.
Raises:
TypeError: If the var is not a list.
Returns:
A var with the reversed list.
"""
if not types._issubclass(self._var_type, list):
raise TypeError(f"Cannot reverse non-list var {self._var_full_name}.")
return self._replace(
_var_name=f"[...{self._var_full_name}].reverse()",
_var_is_string=False,
_var_full_name_needs_state_prefix=False,
)
def lower(self) -> Var:
"""Convert a string var to lowercase.
Returns:
A var with the lowercase string.
Raises:
TypeError: If the var is not a string.
"""
if not types._issubclass(self._var_type, str):
raise TypeError(
f"Cannot convert non-string var {self._var_full_name} to lowercase."
)
return self._replace(
_var_name=f"{self._var_name}.toLowerCase()",
_var_is_string=False,
_var_type=str,
)
def upper(self) -> Var:
"""Convert a string var to uppercase.
Returns:
A var with the uppercase string.
Raises:
TypeError: If the var is not a string.
"""
if not types._issubclass(self._var_type, str):
raise TypeError(
f"Cannot convert non-string var {self._var_full_name} to uppercase."
)
return self._replace(
_var_name=f"{self._var_name}.toUpperCase()",
_var_is_string=False,
_var_type=str,
)
def strip(self, other: str | Var[str] = " ") -> Var:
"""Strip a string var.
Args:
other: The string to strip the var with.
Returns:
A var with the stripped string.
Raises:
TypeError: If the var is not a string.
"""
if not types._issubclass(self._var_type, str):
raise TypeError(f"Cannot strip non-string var {self._var_full_name}.")
other = Var.create_safe(json.dumps(other)) if isinstance(other, str) else other
return self._replace(
_var_name=f"{self._var_name}.replace(/^${other._var_full_name}|${other._var_full_name}$/g, '')",
_var_is_string=False,
merge_var_data=other._var_data,
)
def split(self, other: str | Var[str] = " ") -> Var:
"""Split a string var into a list.
Args:
other: The string to split the var with.
Returns:
A var with the list.
Raises:
TypeError: If the var is not a string.
"""
if not types._issubclass(self._var_type, str):
raise TypeError(f"Cannot split non-string var {self._var_full_name}.")
other = Var.create_safe(json.dumps(other)) if isinstance(other, str) else other
return self._replace(
_var_name=f"{self._var_name}.split({other._var_full_name})",
_var_is_string=False,
_var_type=list[str],
merge_var_data=other._var_data,
)
def join(self, other: str | Var[str] | None = None) -> Var:
"""Join a list var into a string.
Args:
other: The string to join the list with.
Returns:
A var with the string.
Raises:
TypeError: If the var is not a list.
"""
if not types._issubclass(self._var_type, list):
raise TypeError(f"Cannot join non-list var {self._var_full_name}.")
if other is None:
other = Var.create_safe('""')
if isinstance(other, str):
other = Var.create_safe(json.dumps(other))
else:
other = Var.create_safe(other)
return self._replace(
_var_name=f"{self._var_name}.join({other._var_full_name})",
_var_is_string=False,
_var_type=str,
merge_var_data=other._var_data,
)
def foreach(self, fn: Callable) -> Var:
"""Return a list of components. after doing a foreach on this var.
Args:
fn: The function to call on each component.
Returns:
A var representing foreach operation.
Raises:
TypeError: If the var is not a list.
"""
inner_types = get_args(self._var_type)
if not inner_types:
raise TypeError(
f"Cannot foreach over non-sequence var {self._var_full_name} of type {self._var_type}."
)
arg = BaseVar(
_var_name=get_unique_variable_name(),
_var_type=inner_types[0],
)
index = BaseVar(
_var_name=get_unique_variable_name(),
_var_type=int,
)
fn_signature = inspect.signature(fn)
fn_args = (arg, index)
fn_ret = fn(*fn_args[: len(fn_signature.parameters)])
return self._replace(
_var_name=f"{self._var_full_name}.map(({arg._var_name}, {index._var_name}) => {fn_ret})",
_var_is_string=False,
)
@classmethod
def range(
cls,
v1: Var | int = 0,
v2: Var | int | None = None,
step: Var | int | None = None,
) -> Var:
"""Return an iterator over indices from v1 to v2 (or 0 to v1).
Args:
v1: The start of the range or end of range if v2 is not given.
v2: The end of the range.
step: The number of numbers between each item.
Returns:
A var representing range operation.
Raises:
TypeError: If the var is not an int.
"""
if not isinstance(v1, Var):
v1 = Var.create_safe(v1)
if v1._var_type != int:
raise TypeError(f"Cannot get range on non-int var {v1._var_full_name}.")
if not isinstance(v2, Var):
v2 = Var.create(v2)
if v2 is None:
v2 = Var.create_safe("undefined")
elif v2._var_type != int:
raise TypeError(f"Cannot get range on non-int var {v2._var_full_name}.")
if not isinstance(step, Var):
step = Var.create(step)
if step is None:
step = Var.create_safe(1)
elif step._var_type != int:
raise TypeError(f"Cannot get range on non-int var {step._var_full_name}.")
return BaseVar(
_var_name=f"Array.from(range({v1._var_full_name}, {v2._var_full_name}, {step._var_name}))",
_var_type=list[int],
_var_is_local=False,
_var_data=VarData.merge(
v1._var_data,
v2._var_data,
step._var_data,
VarData(
imports={
"/utils/helpers/range.js": [
ImportVar(tag="range", is_default=True),
],
},
),
),
)
def to(self, type_: Type) -> Var:
"""Convert the type of the var.
Args:
type_: The type to convert to.
Returns:
The converted var.
"""
return self._replace(_var_type=type_)
def as_ref(self) -> Var:
"""Convert the var to a ref.
Returns:
The var as a ref.
"""
return self._replace(
_var_name=f"refs['{self._var_full_name}']",
_var_is_local=True,
_var_is_string=False,
_var_full_name_needs_state_prefix=False,
merge_var_data=VarData(
imports={
f"/{constants.Dirs.STATE_PATH}": [imports.ImportVar(tag="refs")],
},
),
)
@property
def _var_full_name(self) -> str:
"""Get the full name of the var.
Returns:
The full name of the var.
"""
if not self._var_full_name_needs_state_prefix:
return self._var_name
return (
self._var_name
if self._var_data is None or self._var_data.state == ""
else ".".join(
[format.format_state_name(self._var_data.state), self._var_name]
)
)
def _var_set_state(self, state: Type[BaseState] | str) -> Any:
"""Set the state of the var.
Args:
state: The state to set or the full name of the state.
Returns:
The var with the set state.
"""
state_name = state if isinstance(state, str) else state.get_full_name()
new_var_data = VarData(
state=state_name,
hooks={
"const {0} = useContext(StateContexts.{0})".format(
format.format_state_name(state_name)
)
},
imports={
f"/{constants.Dirs.CONTEXTS_PATH}": [ImportVar(tag="StateContexts")],
"react": [ImportVar(tag="useContext")],
},
)
self._var_data = VarData.merge(self._var_data, new_var_data)
self._var_full_name_needs_state_prefix = True
return self
@property
def _var_state(self) -> str:
"""Compat method for getting the state.
Returns:
The state name associated with the var.
"""
return self._var_data.state if self._var_data else ""
# Allow automatic serialization of Var within JSON structures
serializers.serializer(_encode_var)
@dataclasses.dataclass(
eq=False,
**{"slots": True} if sys.version_info >= (3, 10) else {},
)
class BaseVar(Var):
"""A base (non-computed) var of the app state."""
# The name of the var.
_var_name: str = dataclasses.field()
# The type of the var.
_var_type: Type = dataclasses.field(default=Any)
# Whether this is a local javascript variable.
_var_is_local: bool = dataclasses.field(default=False)
# Whether the var is a string literal.
_var_is_string: bool = dataclasses.field(default=False)
# _var_full_name should be prefixed with _var_state
_var_full_name_needs_state_prefix: bool = dataclasses.field(default=False)
# Extra metadata associated with the Var
_var_data: Optional[VarData] = dataclasses.field(default=None)
def __hash__(self) -> int:
"""Define a hash function for a var.
Returns:
The hash of the var.
"""
return hash((self._var_name, str(self._var_type)))
def get_default_value(self) -> Any:
"""Get the default value of the var.
Returns:
The default value of the var.
Raises:
ImportError: If the var is a dataframe and pandas is not installed.
"""
if types.is_optional(self._var_type):
return None
type_ = (
get_origin(self._var_type)
if types.is_generic_alias(self._var_type)
else self._var_type
)
if type_ is Literal:
args = get_args(self._var_type)
return args[0] if args else None
if issubclass(type_, str):
return ""
if issubclass(type_, types.get_args(Union[int, float])):
return 0
if issubclass(type_, bool):
return False
if issubclass(type_, list):
return []
if issubclass(type_, dict):
return {}
if issubclass(type_, tuple):
return ()
if types.is_dataframe(type_):
try:
import pandas as pd
return pd.DataFrame()
except ImportError as e:
raise ImportError(
"Please install pandas to use dataframes in your app."
) from e
return set() if issubclass(type_, set) else None
def get_setter_name(self, include_state: bool = True) -> str:
"""Get the name of the var's generated setter function.
Args:
include_state: Whether to include the state name in the setter name.
Returns:
The name of the setter function.
"""
setter = constants.SETTER_PREFIX + self._var_name
if self._var_data is None:
return setter
if not include_state or self._var_data.state == "":
return setter
return ".".join((self._var_data.state, setter))
def get_setter(self) -> Callable[[BaseState, Any], None]:
"""Get the var's setter function.
Returns:
A function that that creates a setter for the var.
"""
def setter(state: BaseState, value: Any):
"""Get the setter for the var.
Args:
state: The state within which we add the setter function.
value: The value to set.
"""
if self._var_type in [int, float]:
try:
value = self._var_type(value)
setattr(state, self._var_name, value)
except ValueError:
console.warn(
f"{self._var_name}: Failed conversion of {value} to '{self._var_type.__name__}'. Value not set.",
)
else:
setattr(state, self._var_name, value)
setter.__qualname__ = self.get_setter_name()
return setter
# Marker for a Var that was not passed
UnspecifiedVar = BaseVar(_var_name="")
@dataclasses.dataclass(init=False, eq=False)
class ComputedVar(Var, property):
"""A field with computed getters."""
# Whether to track dependencies and cache computed values
_cache: bool = dataclasses.field(default=False)
def __init__(
self,
fget: Callable[[BaseState], Any],
fset: Callable[[BaseState, Any], None] | None = None,
fdel: Callable[[BaseState], Any] | None = None,
doc: str | None = None,
**kwargs,
):
"""Initialize a ComputedVar.
Args:
fget: The getter function.
fset: The setter function.
fdel: The deleter function.
doc: The docstring.
**kwargs: additional attributes to set on the instance
"""
property.__init__(self, fget, fset, fdel, doc)
kwargs["_var_name"] = kwargs.pop("_var_name", fget.__name__)
kwargs["_var_type"] = kwargs.pop("_var_type", self._determine_var_type())
BaseVar.__init__(self, **kwargs) # type: ignore
@property
def _cache_attr(self) -> str:
"""Get the attribute used to cache the value on the instance.
Returns:
An attribute name.
"""
return f"__cached_{self._var_name}"
def __get__(self, instance, owner):
"""Get the ComputedVar value.
If the value is already cached on the instance, return the cached value.
Args:
instance: the instance of the class accessing this computed var.
owner: the class that this descriptor is attached to.
Returns:
The value of the var for the given instance.
"""
if instance is None or not self._cache:
return super().__get__(instance, owner)
# handle caching
if not hasattr(instance, self._cache_attr):
setattr(instance, self._cache_attr, super().__get__(instance, owner))
return getattr(instance, self._cache_attr)
def _deps(
self,
objclass: Type,
obj: FunctionType | CodeType | None = None,
self_name: Optional[str] = None,
) -> set[str]:
"""Determine var dependencies of this ComputedVar.
Save references to attributes accessed on "self". Recursively called
when the function makes a method call on "self" or define comprehensions
or nested functions that may reference "self".
Args:
objclass: the class obj this ComputedVar is attached to.
obj: the object to disassemble (defaults to the fget function).
self_name: if specified, look for this name in LOAD_FAST and LOAD_DEREF instructions.
Returns:
A set of variable names accessed by the given obj.
"""
d = set()
if obj is None:
if self.fget is not None:
obj = cast(FunctionType, self.fget)
else:
return set()
with contextlib.suppress(AttributeError):
# unbox functools.partial
obj = cast(FunctionType, obj.func) # type: ignore
with contextlib.suppress(AttributeError):
# unbox EventHandler
obj = cast(FunctionType, obj.fn) # type: ignore
if self_name is None and isinstance(obj, FunctionType):
try:
# the first argument to the function is the name of "self" arg
self_name = obj.__code__.co_varnames[0]
except (AttributeError, IndexError):
self_name = None
if self_name is None:
# cannot reference attributes on self if method takes no args
return set()
self_is_top_of_stack = False
for instruction in dis.get_instructions(obj):
if (
instruction.opname in ("LOAD_FAST", "LOAD_DEREF")
and instruction.argval == self_name
):
# bytecode loaded the class instance to the top of stack, next load instruction
# is referencing an attribute on self
self_is_top_of_stack = True
continue
if self_is_top_of_stack and instruction.opname in (
"LOAD_ATTR",
"LOAD_METHOD",
):
try:
ref_obj = getattr(objclass, instruction.argval)
except Exception:
ref_obj = None
if callable(ref_obj):
# recurse into callable attributes
d.update(
self._deps(
objclass=objclass,
obj=ref_obj,
)
)
else:
# normal attribute access
d.add(instruction.argval)
elif instruction.opname == "LOAD_CONST" and isinstance(
instruction.argval, CodeType
):
# recurse into nested functions / comprehensions, which can reference
# instance attributes from the outer scope
d.update(
self._deps(
objclass=objclass,
obj=instruction.argval,
self_name=self_name,
)
)
self_is_top_of_stack = False
return d
def mark_dirty(self, instance) -> None:
"""Mark this ComputedVar as dirty.
Args:
instance: the state instance that needs to recompute the value.
"""
with contextlib.suppress(AttributeError):
delattr(instance, self._cache_attr)
def _determine_var_type(self) -> Type:
"""Get the type of the var.
Returns:
The type of the var.
"""
hints = get_type_hints(self.fget)
if "return" in hints:
return hints["return"]
return Any
def cached_var(fget: Callable[[Any], Any]) -> ComputedVar:
"""A field with computed getter that tracks other state dependencies.
The cached_var will only be recalculated when other state vars that it
depends on are modified.
Args:
fget: the function that calculates the variable value.
Returns:
ComputedVar that is recomputed when dependencies change.
"""
cvar = ComputedVar(fget=fget)
cvar._cache = True
return cvar
class CallableVar(BaseVar):
"""Decorate a Var-returning function to act as both a Var and a function.
This is used as a compatibility shim for replacing Var objects in the
API with functions that return a family of Var.
"""
def __init__(self, fn: Callable[..., BaseVar]):
"""Initialize a CallableVar.
Args:
fn: The function to decorate (must return Var)
"""
self.fn = fn
default_var = fn()
super().__init__(**dataclasses.asdict(default_var))
def __call__(self, *args, **kwargs) -> BaseVar:
"""Call the decorated function.
Args:
*args: The args to pass to the function.
**kwargs: The kwargs to pass to the function.
Returns:
The Var returned from calling the function.
"""
return self.fn(*args, **kwargs)