123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878 |
- """Define a state var."""
- from __future__ import annotations
- import contextlib
- import dataclasses
- import dis
- import inspect
- import json
- import random
- import re
- import string
- import sys
- from types import CodeType, FunctionType
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Iterable,
- List,
- Literal,
- Optional,
- Set,
- Tuple,
- Type,
- Union,
- _GenericAlias, # type: ignore
- cast,
- get_args,
- get_origin,
- get_type_hints,
- )
- import pydantic
- from reflex import constants
- from reflex.base import Base
- from reflex.utils import console, format, imports, serializers, types
- # This module used to export ImportVar itself, so we still import it for export here
- from reflex.utils.imports import ImportDict, ImportVar
- if TYPE_CHECKING:
- from reflex.state import BaseState
- # Set of unique variable names.
- USED_VARIABLES = set()
- # Supported operators for all types.
- ALL_OPS = ["==", "!=", "!==", "===", "&&", "||"]
- # Delimiters used between function args or operands.
- DELIMITERS = [","]
- # Mapping of valid operations for different type combinations.
- OPERATION_MAPPING = {
- (int, int): {
- "+",
- "-",
- "/",
- "//",
- "*",
- "%",
- "**",
- ">",
- "<",
- "<=",
- ">=",
- "|",
- "&",
- },
- (int, str): {"*"},
- (int, list): {"*"},
- (str, str): {"+", ">", "<", "<=", ">="},
- (float, float): {"+", "-", "/", "//", "*", "%", "**", ">", "<", "<=", ">="},
- (float, int): {"+", "-", "/", "//", "*", "%", "**", ">", "<", "<=", ">="},
- (list, list): {"+", ">", "<", "<=", ">="},
- }
- # These names were changed in reflex 0.3.0
- REPLACED_NAMES = {
- "full_name": "_var_full_name",
- "name": "_var_name",
- "state": "_var_data.state",
- "type_": "_var_type",
- "is_local": "_var_is_local",
- "is_string": "_var_is_string",
- "set_state": "_var_set_state",
- "deps": "_deps",
- }
- def get_unique_variable_name() -> str:
- """Get a unique variable name.
- Returns:
- The unique variable name.
- """
- name = "".join([random.choice(string.ascii_lowercase) for _ in range(8)])
- if name not in USED_VARIABLES:
- USED_VARIABLES.add(name)
- return name
- return get_unique_variable_name()
- class VarData(Base):
- """Metadata associated with a Var."""
- # The name of the enclosing state.
- state: str = ""
- # Imports needed to render this var
- imports: ImportDict = {}
- # Hooks that need to be present in the component to render this var
- hooks: Set[str] = set()
- @classmethod
- def merge(cls, *others: VarData | None) -> VarData | None:
- """Merge multiple var data objects.
- Args:
- *others: The var data objects to merge.
- Returns:
- The merged var data object.
- """
- state = ""
- _imports = {}
- hooks = set()
- for var_data in others:
- if var_data is None:
- continue
- state = state or var_data.state
- _imports = imports.merge_imports(_imports, var_data.imports)
- hooks.update(var_data.hooks)
- return (
- cls(
- state=state,
- imports=_imports,
- hooks=hooks,
- )
- or None
- )
- def __bool__(self) -> bool:
- """Check if the var data is non-empty.
- Returns:
- True if any field is set to a non-default value.
- """
- return bool(self.state or self.imports or self.hooks)
- def __eq__(self, other: Any) -> bool:
- """Check if two var data objects are equal.
- Args:
- other: The other var data object to compare.
- Returns:
- True if all fields are equal and collapsed imports are equal.
- """
- if not isinstance(other, VarData):
- return False
- return (
- self.state == other.state
- and self.hooks == other.hooks
- and imports.collapse_imports(self.imports)
- == imports.collapse_imports(other.imports)
- )
- def __dict(self) -> dict:
- """Convert the var data to a dictionary.
- Returns:
- The var data dictionary.
- """
- return {
- "state": self.state,
- "imports": {
- lib: [import_var.dict() for import_var in import_vars]
- for lib, import_vars in self.imports.items()
- },
- "hooks": list(self.hooks),
- }
- def _encode_var(value: Var) -> str:
- """Encode the state name into a formatted var.
- Args:
- value: The value to encode the state name into.
- Returns:
- The encoded var.
- """
- if value._var_data:
- return f"<reflex.Var>{value._var_data.json()}</reflex.Var>" + str(value)
- return str(value)
- def _decode_var(value: str) -> tuple[VarData | None, str]:
- """Decode the state name from a formatted var.
- Args:
- value: The value to extract the state name from.
- Returns:
- The extracted state name and the value without the state name.
- """
- var_datas = []
- if isinstance(value, str):
- # Extract the state name from a formatted var
- while m := re.match(r"(.*)<reflex.Var>(.*)</reflex.Var>(.*)", value):
- value = m.group(1) + m.group(3)
- try:
- var_datas.append(VarData.parse_raw(m.group(2)))
- except pydantic.ValidationError:
- # If the VarData is invalid, it was probably json-encoded twice...
- var_datas.append(VarData.parse_raw(json.loads(f'"{m.group(2)}"')))
- if var_datas:
- return VarData.merge(*var_datas), value
- return None, value
- def _extract_var_data(value: Iterable) -> list[VarData | None]:
- """Extract the var imports and hooks from an iterable containing a Var.
- Args:
- value: The iterable to extract the VarData from
- Returns:
- The extracted VarDatas.
- """
- var_datas = []
- with contextlib.suppress(TypeError):
- for sub in value:
- if isinstance(sub, Var):
- var_datas.append(sub._var_data)
- elif not isinstance(sub, str):
- # Recurse into dict values.
- if hasattr(sub, "values") and callable(sub.values):
- var_datas.extend(_extract_var_data(sub.values()))
- # Recurse into iterable values (or dict keys).
- var_datas.extend(_extract_var_data(sub))
- # Recurse when value is a dict itself.
- values = getattr(value, "values", None)
- if callable(values):
- var_datas.extend(_extract_var_data(values()))
- return var_datas
- class Var:
- """An abstract var."""
- # The name of the var.
- _var_name: str
- # The type of the var.
- _var_type: Type
- # Whether this is a local javascript variable.
- _var_is_local: bool
- # Whether the var is a string literal.
- _var_is_string: bool
- # _var_full_name should be prefixed with _var_state
- _var_full_name_needs_state_prefix: bool
- # Extra metadata associated with the Var
- _var_data: Optional[VarData]
- @classmethod
- def create(
- cls, value: Any, _var_is_local: bool = True, _var_is_string: bool = False
- ) -> Var | None:
- """Create a var from a value.
- Args:
- value: The value to create the var from.
- _var_is_local: Whether the var is local.
- _var_is_string: Whether the var is a string literal.
- Returns:
- The var.
- Raises:
- TypeError: If the value is JSON-unserializable.
- """
- # Check for none values.
- if value is None:
- return None
- # If the value is already a var, do nothing.
- if isinstance(value, Var):
- return value
- # Try to pull the imports and hooks from contained values.
- _var_data = None
- if not isinstance(value, str):
- _var_data = VarData.merge(*_extract_var_data(value))
- # Try to serialize the value.
- type_ = type(value)
- name = value if type_ in types.JSONType else serializers.serialize(value)
- if name is None:
- raise TypeError(
- f"No JSON serializer found for var {value} of type {type_}."
- )
- name = name if isinstance(name, str) else format.json_dumps(name)
- return BaseVar(
- _var_name=name,
- _var_type=type_,
- _var_is_local=_var_is_local,
- _var_is_string=_var_is_string,
- _var_data=_var_data,
- )
- @classmethod
- def create_safe(
- cls, value: Any, _var_is_local: bool = True, _var_is_string: bool = False
- ) -> Var:
- """Create a var from a value, asserting that it is not None.
- Args:
- value: The value to create the var from.
- _var_is_local: Whether the var is local.
- _var_is_string: Whether the var is a string literal.
- Returns:
- The var.
- """
- var = cls.create(
- value,
- _var_is_local=_var_is_local,
- _var_is_string=_var_is_string,
- )
- assert var is not None
- return var
- @classmethod
- def __class_getitem__(cls, type_: str) -> _GenericAlias:
- """Get a typed var.
- Args:
- type_: The type of the var.
- Returns:
- The var class item.
- """
- return _GenericAlias(cls, type_)
- def __post_init__(self) -> None:
- """Post-initialize the var."""
- # Decode any inline Var markup and apply it to the instance
- _var_data, _var_name = _decode_var(self._var_name)
- if _var_data:
- self._var_name = _var_name
- self._var_data = VarData.merge(self._var_data, _var_data)
- def _replace(self, merge_var_data=None, **kwargs: Any) -> Var:
- """Make a copy of this Var with updated fields.
- Args:
- merge_var_data: VarData to merge into the existing VarData.
- **kwargs: Var fields to update.
- Returns:
- A new BaseVar with the updated fields overwriting the corresponding fields in this Var.
- """
- field_values = dict(
- _var_name=kwargs.pop("_var_name", self._var_name),
- _var_type=kwargs.pop("_var_type", self._var_type),
- _var_is_local=kwargs.pop("_var_is_local", self._var_is_local),
- _var_is_string=kwargs.pop("_var_is_string", self._var_is_string),
- _var_full_name_needs_state_prefix=kwargs.pop(
- "_var_full_name_needs_state_prefix",
- self._var_full_name_needs_state_prefix,
- ),
- _var_data=VarData.merge(
- kwargs.get("_var_data", self._var_data), merge_var_data
- ),
- )
- return BaseVar(**field_values)
- def _decode(self) -> Any:
- """Decode Var as a python value.
- Note that Var with state set cannot be decoded python-side and will be
- returned as full_name.
- Returns:
- The decoded value or the Var name.
- """
- if self._var_is_string:
- return self._var_name
- try:
- return json.loads(self._var_name)
- except ValueError:
- return self._var_name
- def equals(self, other: Var) -> bool:
- """Check if two vars are equal.
- Args:
- other: The other var to compare.
- Returns:
- Whether the vars are equal.
- """
- return (
- self._var_name == other._var_name
- and self._var_type == other._var_type
- and self._var_is_local == other._var_is_local
- and self._var_full_name_needs_state_prefix
- == other._var_full_name_needs_state_prefix
- and self._var_data == other._var_data
- )
- def to_string(self, json: bool = True) -> Var:
- """Convert a var to a string.
- Args:
- json: Whether to convert to a JSON string.
- Returns:
- The stringified var.
- """
- fn = "JSON.stringify" if json else "String"
- return self.operation(fn=fn, type_=str)
- def __hash__(self) -> int:
- """Define a hash function for a var.
- Returns:
- The hash of the var.
- """
- return hash((self._var_name, str(self._var_type)))
- def __str__(self) -> str:
- """Wrap the var so it can be used in templates.
- Returns:
- The wrapped var, i.e. {state.var}.
- """
- out = (
- self._var_full_name
- if self._var_is_local
- else format.wrap(self._var_full_name, "{")
- )
- if self._var_is_string:
- out = format.format_string(out)
- return out
- def __bool__(self) -> bool:
- """Raise exception if using Var in a boolean context.
- Raises:
- TypeError: when attempting to bool-ify the Var.
- """
- # pydantic v2 hacks
- if inspect.stack()[1].function in ("smart_deepcopy"):
- return bool(self._decode())
- raise TypeError(
- f"Cannot convert Var {self._var_full_name!r} to bool for use with `if`, `and`, `or`, and `not`. "
- "Instead use `rx.cond` and bitwise operators `&` (and), `|` (or), `~` (invert)."
- )
- def __iter__(self) -> Any:
- """Raise exception if using Var in an iterable context.
- Raises:
- TypeError: when attempting to iterate over the Var.
- """
- raise TypeError(
- f"Cannot iterate over Var {self._var_full_name!r}. Instead use `rx.foreach`."
- )
- def __format__(self, format_spec: str) -> str:
- """Format the var into a Javascript equivalent to an f-string.
- Args:
- format_spec: The format specifier (Ignored for now).
- Returns:
- The formatted var.
- """
- # Encode the _var_data into the formatted output for tracking purposes.
- str_self = _encode_var(self)
- if self._var_is_local:
- return str_self
- return f"${str_self}"
- def __getitem__(self, i: Any) -> Var:
- """Index into a var.
- Args:
- i: The index to index into.
- Returns:
- The indexed var.
- Raises:
- TypeError: If the var is not indexable.
- """
- # Indexing is only supported for strings, lists, tuples, dicts, and dataframes.
- if not (
- types._issubclass(self._var_type, Union[List, Dict, Tuple, str])
- or types.is_dataframe(self._var_type)
- ):
- if self._var_type == Any:
- raise TypeError(
- "Could not index into var of type Any. (If you are trying to index into a state var, "
- "add the correct type annotation to the var.)"
- )
- raise TypeError(
- f"Var {self._var_name} of type {self._var_type} does not support indexing."
- )
- # The type of the indexed var.
- type_ = Any
- # Convert any vars to local vars.
- if isinstance(i, Var):
- i = i._replace(_var_is_local=True)
- # Handle list/tuple/str indexing.
- if types._issubclass(self._var_type, Union[List, Tuple, str]):
- # List/Tuple/String indices must be ints, slices, or vars.
- if (
- not isinstance(i, types.get_args(Union[int, slice, Var]))
- or isinstance(i, Var)
- and not i._var_type == int
- ):
- raise TypeError("Index must be an integer or an integer var.")
- # Handle slices first.
- if isinstance(i, slice):
- # Get the start and stop indices.
- start = i.start or 0
- stop = i.stop or "undefined"
- # Use the slice function.
- return self._replace(
- _var_name=f"{self._var_name}.slice({start}, {stop})",
- _var_is_string=False,
- )
- # Get the type of the indexed var.
- type_ = (
- types.get_args(self._var_type)[0]
- if types.is_generic_alias(self._var_type)
- else Any
- )
- # Use `at` to support negative indices.
- return self._replace(
- _var_name=f"{self._var_name}.at({i})",
- _var_type=type_,
- _var_is_string=False,
- )
- # Dictionary / dataframe indexing.
- # Tuples are currently not supported as indexes.
- if (
- (
- types._issubclass(self._var_type, Dict)
- or types.is_dataframe(self._var_type)
- )
- and not isinstance(i, types.get_args(Union[int, str, float, Var]))
- ) or (
- isinstance(i, Var)
- and not types._issubclass(
- i._var_type, types.get_args(Union[int, str, float])
- )
- ):
- raise TypeError(
- "Index must be one of the following types: int, str, int or str Var"
- )
- # Get the type of the indexed var.
- if isinstance(i, str):
- i = format.wrap(i, '"')
- type_ = (
- types.get_args(self._var_type)[1]
- if types.is_generic_alias(self._var_type)
- else Any
- )
- # Use normal indexing here.
- return self._replace(
- _var_name=f"{self._var_name}[{i}]",
- _var_type=type_,
- _var_is_string=False,
- )
- def __getattr__(self, name: str) -> Var:
- """Get a var attribute.
- Args:
- name: The name of the attribute.
- Returns:
- The var attribute.
- Raises:
- AttributeError: If the var is wrongly annotated or can't find attribute.
- TypeError: If an annotation to the var isn't provided.
- """
- # Check if the attribute is one of the class fields.
- if not name.startswith("_"):
- if self._var_type == Any:
- raise TypeError(
- f"You must provide an annotation for the state var `{self._var_full_name}`. Annotation cannot be `{self._var_type}`"
- ) from None
- is_optional = types.is_optional(self._var_type)
- type_ = types.get_attribute_access_type(self._var_type, name)
- if type_ is not None:
- return self._replace(
- _var_name=f"{self._var_name}{'?' if is_optional else ''}.{name}",
- _var_type=type_,
- _var_is_string=False,
- )
- if name in REPLACED_NAMES:
- raise AttributeError(
- f"Field {name!r} was renamed to {REPLACED_NAMES[name]!r}"
- )
- raise AttributeError(
- f"The State var `{self._var_full_name}` has no attribute '{name}' or may have been annotated "
- f"wrongly."
- )
- if name.startswith("_var"):
- print(name)
- raise AttributeError(
- f"The State var has no attribute '{name}' or may have been annotated wrongly.",
- )
- def operation(
- self,
- op: str = "",
- other: Var | None = None,
- type_: Type | None = None,
- flip: bool = False,
- fn: str | None = None,
- invoke_fn: bool = False,
- ) -> Var:
- """Perform an operation on a var.
- Args:
- op: The operation to perform.
- other: The other var to perform the operation on.
- type_: The type of the operation result.
- flip: Whether to flip the order of the operation.
- fn: A function to apply to the operation.
- invoke_fn: Whether to invoke the function.
- Returns:
- The operation result.
- Raises:
- TypeError: If the operation between two operands is invalid.
- ValueError: If flip is set to true and value of operand is not provided
- """
- if isinstance(other, str):
- other = Var.create(json.dumps(other))
- else:
- other = Var.create(other)
- type_ = type_ or self._var_type
- if other is None and flip:
- raise ValueError(
- "flip_operands cannot be set to True if the value of 'other' operand is not provided"
- )
- left_operand, right_operand = (other, self) if flip else (self, other)
- if other is not None:
- # check if the operation between operands is valid.
- if op and not self.is_valid_operation(
- types.get_base_class(left_operand._var_type), # type: ignore
- types.get_base_class(right_operand._var_type), # type: ignore
- op,
- ):
- raise TypeError(
- f"Unsupported Operand type(s) for {op}: `{left_operand._var_full_name}` of type {left_operand._var_type.__name__} and `{right_operand._var_full_name}` of type {right_operand._var_type.__name__}" # type: ignore
- )
- # apply function to operands
- if fn is not None:
- if invoke_fn:
- # invoke the function on left operand.
- operation_name = f"{left_operand._var_full_name}.{fn}({right_operand._var_full_name})" # type: ignore
- else:
- # pass the operands as arguments to the function.
- operation_name = f"{left_operand._var_full_name} {op} {right_operand._var_full_name}" # type: ignore
- operation_name = f"{fn}({operation_name})"
- else:
- # apply operator to operands (left operand <operator> right_operand)
- operation_name = f"{left_operand._var_full_name} {op} {right_operand._var_full_name}" # type: ignore
- operation_name = format.wrap(operation_name, "(")
- else:
- # apply operator to left operand (<operator> left_operand)
- operation_name = f"{op}{self._var_full_name}"
- # apply function to operands
- if fn is not None:
- operation_name = (
- f"{fn}({operation_name})"
- if not invoke_fn
- else f"{self._var_full_name}.{fn}()"
- )
- return self._replace(
- _var_name=operation_name,
- _var_type=type_,
- _var_is_string=False,
- _var_full_name_needs_state_prefix=False,
- merge_var_data=other._var_data if other is not None else None,
- )
- @staticmethod
- def is_valid_operation(
- operand1_type: Type, operand2_type: Type, operator: str
- ) -> bool:
- """Check if an operation between two operands is valid.
- Args:
- operand1_type: Type of the operand
- operand2_type: Type of the second operand
- operator: The operator.
- Returns:
- Whether operation is valid or not
- """
- if operator in ALL_OPS or operator in DELIMITERS:
- return True
- # bools are subclasses of ints
- pair = tuple(
- sorted(
- [
- int if operand1_type == bool else operand1_type,
- int if operand2_type == bool else operand2_type,
- ],
- key=lambda x: x.__name__,
- )
- )
- return pair in OPERATION_MAPPING and operator in OPERATION_MAPPING[pair]
- def compare(self, op: str, other: Var) -> Var:
- """Compare two vars with inequalities.
- Args:
- op: The comparison operator.
- other: The other var to compare with.
- Returns:
- The comparison result.
- """
- return self.operation(op, other, bool)
- def __invert__(self) -> Var:
- """Invert a var.
- Returns:
- The inverted var.
- """
- return self.operation("!", type_=bool)
- def __neg__(self) -> Var:
- """Negate a var.
- Returns:
- The negated var.
- """
- return self.operation(fn="-")
- def __abs__(self) -> Var:
- """Get the absolute value of a var.
- Returns:
- A var with the absolute value.
- """
- return self.operation(fn="Math.abs")
- def length(self) -> Var:
- """Get the length of a list var.
- Returns:
- A var with the absolute value.
- Raises:
- TypeError: If the var is not a list.
- """
- if not types._issubclass(self._var_type, List):
- raise TypeError(f"Cannot get length of non-list var {self}.")
- return self._replace(
- _var_name=f"{self._var_name}.length",
- _var_type=int,
- _var_is_string=False,
- )
- def __eq__(self, other: Var) -> Var:
- """Perform an equality comparison.
- Args:
- other: The other var to compare with.
- Returns:
- A var representing the equality comparison.
- """
- return self.compare("===", other)
- def __ne__(self, other: Var) -> Var:
- """Perform an inequality comparison.
- Args:
- other: The other var to compare with.
- Returns:
- A var representing the inequality comparison.
- """
- return self.compare("!==", other)
- def __gt__(self, other: Var) -> Var:
- """Perform a greater than comparison.
- Args:
- other: The other var to compare with.
- Returns:
- A var representing the greater than comparison.
- """
- return self.compare(">", other)
- def __ge__(self, other: Var) -> Var:
- """Perform a greater than or equal to comparison.
- Args:
- other: The other var to compare with.
- Returns:
- A var representing the greater than or equal to comparison.
- """
- return self.compare(">=", other)
- def __lt__(self, other: Var) -> Var:
- """Perform a less than comparison.
- Args:
- other: The other var to compare with.
- Returns:
- A var representing the less than comparison.
- """
- return self.compare("<", other)
- def __le__(self, other: Var) -> Var:
- """Perform a less than or equal to comparison.
- Args:
- other: The other var to compare with.
- Returns:
- A var representing the less than or equal to comparison.
- """
- return self.compare("<=", other)
- def __add__(self, other: Var, flip=False) -> Var:
- """Add two vars.
- Args:
- other: The other var to add.
- flip: Whether to flip operands.
- Returns:
- A var representing the sum.
- """
- other_type = other._var_type if isinstance(other, Var) else type(other)
- # For list-list addition, javascript concatenates the content of the lists instead of
- # merging the list, and for that reason we use the spread operator available through spreadArraysOrObjects
- # utility function
- if (
- types.get_base_class(self._var_type) == list
- and types.get_base_class(other_type) == list
- ):
- return self.operation(
- ",", other, fn="spreadArraysOrObjects", flip=flip
- )._replace(
- merge_var_data=VarData(
- imports={
- f"/{constants.Dirs.STATE_PATH}": [
- ImportVar(tag="spreadArraysOrObjects")
- ]
- },
- ),
- )
- return self.operation("+", other, flip=flip)
- def __radd__(self, other: Var) -> Var:
- """Add two vars.
- Args:
- other: The other var to add.
- Returns:
- A var representing the sum.
- """
- return self.__add__(other=other, flip=True)
- def __sub__(self, other: Var) -> Var:
- """Subtract two vars.
- Args:
- other: The other var to subtract.
- Returns:
- A var representing the difference.
- """
- return self.operation("-", other)
- def __rsub__(self, other: Var) -> Var:
- """Subtract two vars.
- Args:
- other: The other var to subtract.
- Returns:
- A var representing the difference.
- """
- return self.operation("-", other, flip=True)
- def __mul__(self, other: Var, flip=True) -> Var:
- """Multiply two vars.
- Args:
- other: The other var to multiply.
- flip: Whether to flip operands
- Returns:
- A var representing the product.
- """
- other_type = other._var_type if isinstance(other, Var) else type(other)
- # For str-int multiplication, we use the repeat function.
- # i.e "hello" * 2 is equivalent to "hello".repeat(2) in js.
- if (types.get_base_class(self._var_type), types.get_base_class(other_type)) in [
- (int, str),
- (str, int),
- ]:
- return self.operation(other=other, fn="repeat", invoke_fn=True)
- # For list-int multiplication, we use the Array function.
- # i.e ["hello"] * 2 is equivalent to Array(2).fill().map(() => ["hello"]).flat() in js.
- if (types.get_base_class(self._var_type), types.get_base_class(other_type)) in [
- (int, list),
- (list, int),
- ]:
- other_name = other._var_full_name if isinstance(other, Var) else other
- name = f"Array({other_name}).fill().map(() => {self._var_full_name}).flat()"
- return self._replace(
- _var_name=name,
- _var_type=str,
- _var_is_string=False,
- _var_full_name_needs_state_prefix=False,
- )
- return self.operation("*", other)
- def __rmul__(self, other: Var) -> Var:
- """Multiply two vars.
- Args:
- other: The other var to multiply.
- Returns:
- A var representing the product.
- """
- return self.__mul__(other=other, flip=True)
- def __pow__(self, other: Var) -> Var:
- """Raise a var to a power.
- Args:
- other: The power to raise to.
- Returns:
- A var representing the power.
- """
- return self.operation(",", other, fn="Math.pow")
- def __rpow__(self, other: Var) -> Var:
- """Raise a var to a power.
- Args:
- other: The power to raise to.
- Returns:
- A var representing the power.
- """
- return self.operation(",", other, flip=True, fn="Math.pow")
- def __truediv__(self, other: Var) -> Var:
- """Divide two vars.
- Args:
- other: The other var to divide.
- Returns:
- A var representing the quotient.
- """
- return self.operation("/", other)
- def __rtruediv__(self, other: Var) -> Var:
- """Divide two vars.
- Args:
- other: The other var to divide.
- Returns:
- A var representing the quotient.
- """
- return self.operation("/", other, flip=True)
- def __floordiv__(self, other: Var) -> Var:
- """Divide two vars.
- Args:
- other: The other var to divide.
- Returns:
- A var representing the quotient.
- """
- return self.operation("/", other, fn="Math.floor")
- def __mod__(self, other: Var) -> Var:
- """Get the remainder of two vars.
- Args:
- other: The other var to divide.
- Returns:
- A var representing the remainder.
- """
- return self.operation("%", other)
- def __rmod__(self, other: Var) -> Var:
- """Get the remainder of two vars.
- Args:
- other: The other var to divide.
- Returns:
- A var representing the remainder.
- """
- return self.operation("%", other, flip=True)
- def __and__(self, other: Var) -> Var:
- """Perform a logical and.
- Args:
- other: The other var to perform the logical AND with.
- Returns:
- A var representing the logical AND.
- Note:
- This method provides behavior specific to JavaScript, where it returns the JavaScript
- equivalent code (using the '&&' operator) of a logical AND operation.
- In JavaScript, the
- logical OR operator '&&' is used for Boolean logic, and this method emulates that behavior
- by returning the equivalent code as a Var instance.
- In Python, logical AND 'and' operates differently, evaluating expressions immediately, making
- it challenging to override the behavior entirely.
- Therefore, this method leverages the
- bitwise AND '__and__' operator for custom JavaScript-like behavior.
- Example:
- >>> var1 = Var.create(True)
- >>> var2 = Var.create(False)
- >>> js_code = var1 & var2
- >>> print(js_code._var_full_name)
- '(true && false)'
- """
- return self.operation("&&", other, type_=bool)
- def __rand__(self, other: Var) -> Var:
- """Perform a logical and.
- Args:
- other: The other var to perform the logical AND with.
- Returns:
- A var representing the logical AND.
- Note:
- This method provides behavior specific to JavaScript, where it returns the JavaScript
- equivalent code (using the '&&' operator) of a logical AND operation.
- In JavaScript, the
- logical OR operator '&&' is used for Boolean logic, and this method emulates that behavior
- by returning the equivalent code as a Var instance.
- In Python, logical AND 'and' operates differently, evaluating expressions immediately, making
- it challenging to override the behavior entirely.
- Therefore, this method leverages the
- bitwise AND '__rand__' operator for custom JavaScript-like behavior.
- Example:
- >>> var1 = Var.create(True)
- >>> var2 = Var.create(False)
- >>> js_code = var1 & var2
- >>> print(js_code._var_full_name)
- '(false && true)'
- """
- return self.operation("&&", other, type_=bool, flip=True)
- def __or__(self, other: Var) -> Var:
- """Perform a logical or.
- Args:
- other: The other var to perform the logical or with.
- Returns:
- A var representing the logical or.
- Note:
- This method provides behavior specific to JavaScript, where it returns the JavaScript
- equivalent code (using the '||' operator) of a logical OR operation. In JavaScript, the
- logical OR operator '||' is used for Boolean logic, and this method emulates that behavior
- by returning the equivalent code as a Var instance.
- In Python, logical OR 'or' operates differently, evaluating expressions immediately, making
- it challenging to override the behavior entirely. Therefore, this method leverages the
- bitwise OR '__or__' operator for custom JavaScript-like behavior.
- Example:
- >>> var1 = Var.create(True)
- >>> var2 = Var.create(False)
- >>> js_code = var1 | var2
- >>> print(js_code._var_full_name)
- '(true || false)'
- """
- return self.operation("||", other, type_=bool)
- def __ror__(self, other: Var) -> Var:
- """Perform a logical or.
- Args:
- other: The other var to perform the logical or with.
- Returns:
- A var representing the logical or.
- Note:
- This method provides behavior specific to JavaScript, where it returns the JavaScript
- equivalent code (using the '||' operator) of a logical OR operation. In JavaScript, the
- logical OR operator '||' is used for Boolean logic, and this method emulates that behavior
- by returning the equivalent code as a Var instance.
- In Python, logical OR 'or' operates differently, evaluating expressions immediately, making
- it challenging to override the behavior entirely. Therefore, this method leverages the
- bitwise OR '__or__' operator for custom JavaScript-like behavior.
- Example:
- >>> var1 = Var.create(True)
- >>> var2 = Var.create(False)
- >>> js_code = var1 | var2
- >>> print(js_code)
- 'false || true'
- """
- return self.operation("||", other, type_=bool, flip=True)
- def __contains__(self, _: Any) -> Var:
- """Override the 'in' operator to alert the user that it is not supported.
- Raises:
- TypeError: the operation is not supported
- """
- raise TypeError(
- "'in' operator not supported for Var types, use Var.contains() instead."
- )
- def contains(self, other: Any) -> Var:
- """Check if a var contains the object `other`.
- Args:
- other: The object to check.
- Raises:
- TypeError: If the var is not a valid type: dict, list, tuple or str.
- Returns:
- A var representing the contain check.
- """
- if not (types._issubclass(self._var_type, Union[dict, list, tuple, str])):
- raise TypeError(
- f"Var {self._var_full_name} of type {self._var_type} does not support contains check."
- )
- method = (
- "hasOwnProperty"
- if types.get_base_class(self._var_type) == dict
- else "includes"
- )
- if isinstance(other, str):
- other = Var.create(json.dumps(other), _var_is_string=True)
- elif not isinstance(other, Var):
- other = Var.create(other)
- if types._issubclass(self._var_type, Dict):
- return self._replace(
- _var_name=f"{self._var_name}.{method}({other._var_full_name})",
- _var_type=bool,
- _var_is_string=False,
- merge_var_data=other._var_data,
- )
- else: # str, list, tuple
- # For strings, the left operand must be a string.
- if types._issubclass(self._var_type, str) and not types._issubclass(
- other._var_type, str
- ):
- raise TypeError(
- f"'in <string>' requires string as left operand, not {other._var_type}"
- )
- return self._replace(
- _var_name=f"{self._var_name}.includes({other._var_full_name})",
- _var_type=bool,
- _var_is_string=False,
- merge_var_data=other._var_data,
- )
- def reverse(self) -> Var:
- """Reverse a list var.
- Raises:
- TypeError: If the var is not a list.
- Returns:
- A var with the reversed list.
- """
- if not types._issubclass(self._var_type, list):
- raise TypeError(f"Cannot reverse non-list var {self._var_full_name}.")
- return self._replace(
- _var_name=f"[...{self._var_full_name}].reverse()",
- _var_is_string=False,
- _var_full_name_needs_state_prefix=False,
- )
- def lower(self) -> Var:
- """Convert a string var to lowercase.
- Returns:
- A var with the lowercase string.
- Raises:
- TypeError: If the var is not a string.
- """
- if not types._issubclass(self._var_type, str):
- raise TypeError(
- f"Cannot convert non-string var {self._var_full_name} to lowercase."
- )
- return self._replace(
- _var_name=f"{self._var_name}.toLowerCase()",
- _var_is_string=False,
- _var_type=str,
- )
- def upper(self) -> Var:
- """Convert a string var to uppercase.
- Returns:
- A var with the uppercase string.
- Raises:
- TypeError: If the var is not a string.
- """
- if not types._issubclass(self._var_type, str):
- raise TypeError(
- f"Cannot convert non-string var {self._var_full_name} to uppercase."
- )
- return self._replace(
- _var_name=f"{self._var_name}.toUpperCase()",
- _var_is_string=False,
- _var_type=str,
- )
- def strip(self, other: str | Var[str] = " ") -> Var:
- """Strip a string var.
- Args:
- other: The string to strip the var with.
- Returns:
- A var with the stripped string.
- Raises:
- TypeError: If the var is not a string.
- """
- if not types._issubclass(self._var_type, str):
- raise TypeError(f"Cannot strip non-string var {self._var_full_name}.")
- other = Var.create_safe(json.dumps(other)) if isinstance(other, str) else other
- return self._replace(
- _var_name=f"{self._var_name}.replace(/^${other._var_full_name}|${other._var_full_name}$/g, '')",
- _var_is_string=False,
- merge_var_data=other._var_data,
- )
- def split(self, other: str | Var[str] = " ") -> Var:
- """Split a string var into a list.
- Args:
- other: The string to split the var with.
- Returns:
- A var with the list.
- Raises:
- TypeError: If the var is not a string.
- """
- if not types._issubclass(self._var_type, str):
- raise TypeError(f"Cannot split non-string var {self._var_full_name}.")
- other = Var.create_safe(json.dumps(other)) if isinstance(other, str) else other
- return self._replace(
- _var_name=f"{self._var_name}.split({other._var_full_name})",
- _var_is_string=False,
- _var_type=list[str],
- merge_var_data=other._var_data,
- )
- def join(self, other: str | Var[str] | None = None) -> Var:
- """Join a list var into a string.
- Args:
- other: The string to join the list with.
- Returns:
- A var with the string.
- Raises:
- TypeError: If the var is not a list.
- """
- if not types._issubclass(self._var_type, list):
- raise TypeError(f"Cannot join non-list var {self._var_full_name}.")
- if other is None:
- other = Var.create_safe('""')
- if isinstance(other, str):
- other = Var.create_safe(json.dumps(other))
- else:
- other = Var.create_safe(other)
- return self._replace(
- _var_name=f"{self._var_name}.join({other._var_full_name})",
- _var_is_string=False,
- _var_type=str,
- merge_var_data=other._var_data,
- )
- def foreach(self, fn: Callable) -> Var:
- """Return a list of components. after doing a foreach on this var.
- Args:
- fn: The function to call on each component.
- Returns:
- A var representing foreach operation.
- Raises:
- TypeError: If the var is not a list.
- """
- inner_types = get_args(self._var_type)
- if not inner_types:
- raise TypeError(
- f"Cannot foreach over non-sequence var {self._var_full_name} of type {self._var_type}."
- )
- arg = BaseVar(
- _var_name=get_unique_variable_name(),
- _var_type=inner_types[0],
- )
- index = BaseVar(
- _var_name=get_unique_variable_name(),
- _var_type=int,
- )
- fn_signature = inspect.signature(fn)
- fn_args = (arg, index)
- fn_ret = fn(*fn_args[: len(fn_signature.parameters)])
- return self._replace(
- _var_name=f"{self._var_full_name}.map(({arg._var_name}, {index._var_name}) => {fn_ret})",
- _var_is_string=False,
- )
- @classmethod
- def range(
- cls,
- v1: Var | int = 0,
- v2: Var | int | None = None,
- step: Var | int | None = None,
- ) -> Var:
- """Return an iterator over indices from v1 to v2 (or 0 to v1).
- Args:
- v1: The start of the range or end of range if v2 is not given.
- v2: The end of the range.
- step: The number of numbers between each item.
- Returns:
- A var representing range operation.
- Raises:
- TypeError: If the var is not an int.
- """
- if not isinstance(v1, Var):
- v1 = Var.create_safe(v1)
- if v1._var_type != int:
- raise TypeError(f"Cannot get range on non-int var {v1._var_full_name}.")
- if not isinstance(v2, Var):
- v2 = Var.create(v2)
- if v2 is None:
- v2 = Var.create_safe("undefined")
- elif v2._var_type != int:
- raise TypeError(f"Cannot get range on non-int var {v2._var_full_name}.")
- if not isinstance(step, Var):
- step = Var.create(step)
- if step is None:
- step = Var.create_safe(1)
- elif step._var_type != int:
- raise TypeError(f"Cannot get range on non-int var {step._var_full_name}.")
- return BaseVar(
- _var_name=f"Array.from(range({v1._var_full_name}, {v2._var_full_name}, {step._var_name}))",
- _var_type=list[int],
- _var_is_local=False,
- _var_data=VarData.merge(
- v1._var_data,
- v2._var_data,
- step._var_data,
- VarData(
- imports={
- "/utils/helpers/range.js": [
- ImportVar(tag="range", is_default=True),
- ],
- },
- ),
- ),
- )
- def to(self, type_: Type) -> Var:
- """Convert the type of the var.
- Args:
- type_: The type to convert to.
- Returns:
- The converted var.
- """
- return self._replace(_var_type=type_)
- def as_ref(self) -> Var:
- """Convert the var to a ref.
- Returns:
- The var as a ref.
- """
- return self._replace(
- _var_name=f"refs['{self._var_full_name}']",
- _var_is_local=True,
- _var_is_string=False,
- _var_full_name_needs_state_prefix=False,
- merge_var_data=VarData(
- imports={
- f"/{constants.Dirs.STATE_PATH}": [imports.ImportVar(tag="refs")],
- },
- ),
- )
- @property
- def _var_full_name(self) -> str:
- """Get the full name of the var.
- Returns:
- The full name of the var.
- """
- if not self._var_full_name_needs_state_prefix:
- return self._var_name
- return (
- self._var_name
- if self._var_data is None or self._var_data.state == ""
- else ".".join(
- [format.format_state_name(self._var_data.state), self._var_name]
- )
- )
- def _var_set_state(self, state: Type[BaseState] | str) -> Any:
- """Set the state of the var.
- Args:
- state: The state to set or the full name of the state.
- Returns:
- The var with the set state.
- """
- state_name = state if isinstance(state, str) else state.get_full_name()
- new_var_data = VarData(
- state=state_name,
- hooks={
- "const {0} = useContext(StateContexts.{0})".format(
- format.format_state_name(state_name)
- )
- },
- imports={
- f"/{constants.Dirs.CONTEXTS_PATH}": [ImportVar(tag="StateContexts")],
- "react": [ImportVar(tag="useContext")],
- },
- )
- self._var_data = VarData.merge(self._var_data, new_var_data)
- self._var_full_name_needs_state_prefix = True
- return self
- @property
- def _var_state(self) -> str:
- """Compat method for getting the state.
- Returns:
- The state name associated with the var.
- """
- return self._var_data.state if self._var_data else ""
- # Allow automatic serialization of Var within JSON structures
- serializers.serializer(_encode_var)
- @dataclasses.dataclass(
- eq=False,
- **{"slots": True} if sys.version_info >= (3, 10) else {},
- )
- class BaseVar(Var):
- """A base (non-computed) var of the app state."""
- # The name of the var.
- _var_name: str = dataclasses.field()
- # The type of the var.
- _var_type: Type = dataclasses.field(default=Any)
- # Whether this is a local javascript variable.
- _var_is_local: bool = dataclasses.field(default=False)
- # Whether the var is a string literal.
- _var_is_string: bool = dataclasses.field(default=False)
- # _var_full_name should be prefixed with _var_state
- _var_full_name_needs_state_prefix: bool = dataclasses.field(default=False)
- # Extra metadata associated with the Var
- _var_data: Optional[VarData] = dataclasses.field(default=None)
- def __hash__(self) -> int:
- """Define a hash function for a var.
- Returns:
- The hash of the var.
- """
- return hash((self._var_name, str(self._var_type)))
- def get_default_value(self) -> Any:
- """Get the default value of the var.
- Returns:
- The default value of the var.
- Raises:
- ImportError: If the var is a dataframe and pandas is not installed.
- """
- if types.is_optional(self._var_type):
- return None
- type_ = (
- get_origin(self._var_type)
- if types.is_generic_alias(self._var_type)
- else self._var_type
- )
- if type_ is Literal:
- args = get_args(self._var_type)
- return args[0] if args else None
- if issubclass(type_, str):
- return ""
- if issubclass(type_, types.get_args(Union[int, float])):
- return 0
- if issubclass(type_, bool):
- return False
- if issubclass(type_, list):
- return []
- if issubclass(type_, dict):
- return {}
- if issubclass(type_, tuple):
- return ()
- if types.is_dataframe(type_):
- try:
- import pandas as pd
- return pd.DataFrame()
- except ImportError as e:
- raise ImportError(
- "Please install pandas to use dataframes in your app."
- ) from e
- return set() if issubclass(type_, set) else None
- def get_setter_name(self, include_state: bool = True) -> str:
- """Get the name of the var's generated setter function.
- Args:
- include_state: Whether to include the state name in the setter name.
- Returns:
- The name of the setter function.
- """
- setter = constants.SETTER_PREFIX + self._var_name
- if self._var_data is None:
- return setter
- if not include_state or self._var_data.state == "":
- return setter
- return ".".join((self._var_data.state, setter))
- def get_setter(self) -> Callable[[BaseState, Any], None]:
- """Get the var's setter function.
- Returns:
- A function that that creates a setter for the var.
- """
- def setter(state: BaseState, value: Any):
- """Get the setter for the var.
- Args:
- state: The state within which we add the setter function.
- value: The value to set.
- """
- if self._var_type in [int, float]:
- try:
- value = self._var_type(value)
- setattr(state, self._var_name, value)
- except ValueError:
- console.warn(
- f"{self._var_name}: Failed conversion of {value} to '{self._var_type.__name__}'. Value not set.",
- )
- else:
- setattr(state, self._var_name, value)
- setter.__qualname__ = self.get_setter_name()
- return setter
- # Marker for a Var that was not passed
- UnspecifiedVar = BaseVar(_var_name="<UNSPECIFIED>")
- @dataclasses.dataclass(init=False, eq=False)
- class ComputedVar(Var, property):
- """A field with computed getters."""
- # Whether to track dependencies and cache computed values
- _cache: bool = dataclasses.field(default=False)
- def __init__(
- self,
- fget: Callable[[BaseState], Any],
- fset: Callable[[BaseState, Any], None] | None = None,
- fdel: Callable[[BaseState], Any] | None = None,
- doc: str | None = None,
- **kwargs,
- ):
- """Initialize a ComputedVar.
- Args:
- fget: The getter function.
- fset: The setter function.
- fdel: The deleter function.
- doc: The docstring.
- **kwargs: additional attributes to set on the instance
- """
- property.__init__(self, fget, fset, fdel, doc)
- kwargs["_var_name"] = kwargs.pop("_var_name", fget.__name__)
- kwargs["_var_type"] = kwargs.pop("_var_type", self._determine_var_type())
- BaseVar.__init__(self, **kwargs) # type: ignore
- @property
- def _cache_attr(self) -> str:
- """Get the attribute used to cache the value on the instance.
- Returns:
- An attribute name.
- """
- return f"__cached_{self._var_name}"
- def __get__(self, instance, owner):
- """Get the ComputedVar value.
- If the value is already cached on the instance, return the cached value.
- Args:
- instance: the instance of the class accessing this computed var.
- owner: the class that this descriptor is attached to.
- Returns:
- The value of the var for the given instance.
- """
- if instance is None or not self._cache:
- return super().__get__(instance, owner)
- # handle caching
- if not hasattr(instance, self._cache_attr):
- setattr(instance, self._cache_attr, super().__get__(instance, owner))
- return getattr(instance, self._cache_attr)
- def _deps(
- self,
- objclass: Type,
- obj: FunctionType | CodeType | None = None,
- self_name: Optional[str] = None,
- ) -> set[str]:
- """Determine var dependencies of this ComputedVar.
- Save references to attributes accessed on "self". Recursively called
- when the function makes a method call on "self" or define comprehensions
- or nested functions that may reference "self".
- Args:
- objclass: the class obj this ComputedVar is attached to.
- obj: the object to disassemble (defaults to the fget function).
- self_name: if specified, look for this name in LOAD_FAST and LOAD_DEREF instructions.
- Returns:
- A set of variable names accessed by the given obj.
- """
- d = set()
- if obj is None:
- if self.fget is not None:
- obj = cast(FunctionType, self.fget)
- else:
- return set()
- with contextlib.suppress(AttributeError):
- # unbox functools.partial
- obj = cast(FunctionType, obj.func) # type: ignore
- with contextlib.suppress(AttributeError):
- # unbox EventHandler
- obj = cast(FunctionType, obj.fn) # type: ignore
- if self_name is None and isinstance(obj, FunctionType):
- try:
- # the first argument to the function is the name of "self" arg
- self_name = obj.__code__.co_varnames[0]
- except (AttributeError, IndexError):
- self_name = None
- if self_name is None:
- # cannot reference attributes on self if method takes no args
- return set()
- self_is_top_of_stack = False
- for instruction in dis.get_instructions(obj):
- if (
- instruction.opname in ("LOAD_FAST", "LOAD_DEREF")
- and instruction.argval == self_name
- ):
- # bytecode loaded the class instance to the top of stack, next load instruction
- # is referencing an attribute on self
- self_is_top_of_stack = True
- continue
- if self_is_top_of_stack and instruction.opname in (
- "LOAD_ATTR",
- "LOAD_METHOD",
- ):
- try:
- ref_obj = getattr(objclass, instruction.argval)
- except Exception:
- ref_obj = None
- if callable(ref_obj):
- # recurse into callable attributes
- d.update(
- self._deps(
- objclass=objclass,
- obj=ref_obj,
- )
- )
- else:
- # normal attribute access
- d.add(instruction.argval)
- elif instruction.opname == "LOAD_CONST" and isinstance(
- instruction.argval, CodeType
- ):
- # recurse into nested functions / comprehensions, which can reference
- # instance attributes from the outer scope
- d.update(
- self._deps(
- objclass=objclass,
- obj=instruction.argval,
- self_name=self_name,
- )
- )
- self_is_top_of_stack = False
- return d
- def mark_dirty(self, instance) -> None:
- """Mark this ComputedVar as dirty.
- Args:
- instance: the state instance that needs to recompute the value.
- """
- with contextlib.suppress(AttributeError):
- delattr(instance, self._cache_attr)
- def _determine_var_type(self) -> Type:
- """Get the type of the var.
- Returns:
- The type of the var.
- """
- hints = get_type_hints(self.fget)
- if "return" in hints:
- return hints["return"]
- return Any
- def cached_var(fget: Callable[[Any], Any]) -> ComputedVar:
- """A field with computed getter that tracks other state dependencies.
- The cached_var will only be recalculated when other state vars that it
- depends on are modified.
- Args:
- fget: the function that calculates the variable value.
- Returns:
- ComputedVar that is recomputed when dependencies change.
- """
- cvar = ComputedVar(fget=fget)
- cvar._cache = True
- return cvar
- class CallableVar(BaseVar):
- """Decorate a Var-returning function to act as both a Var and a function.
- This is used as a compatibility shim for replacing Var objects in the
- API with functions that return a family of Var.
- """
- def __init__(self, fn: Callable[..., BaseVar]):
- """Initialize a CallableVar.
- Args:
- fn: The function to decorate (must return Var)
- """
- self.fn = fn
- default_var = fn()
- super().__init__(**dataclasses.asdict(default_var))
- def __call__(self, *args, **kwargs) -> BaseVar:
- """Call the decorated function.
- Args:
- *args: The args to pass to the function.
- **kwargs: The kwargs to pass to the function.
- Returns:
- The Var returned from calling the function.
- """
- return self.fn(*args, **kwargs)
|