12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592 |
- """General utility functions."""
- from __future__ import annotations
- import contextlib
- import inspect
- import json
- import os
- import platform
- import random
- import re
- import shutil
- import signal
- import string
- import subprocess
- import sys
- from collections import defaultdict
- from pathlib import Path
- from subprocess import DEVNULL, PIPE, STDOUT
- from types import ModuleType
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- List,
- Optional,
- Tuple,
- Type,
- Union,
- _GenericAlias, # type: ignore # type: ignore
- )
- from urllib.parse import urlparse
- import plotly.graph_objects as go
- import psutil
- import typer
- import uvicorn
- from plotly.io import to_json
- from redis import Redis
- from rich.console import Console
- from rich.prompt import Prompt
- from pynecone import constants
- from pynecone.base import Base
- if TYPE_CHECKING:
- from pynecone.app import App
- from pynecone.components.component import ImportDict
- from pynecone.config import Config
- from pynecone.event import Event, EventHandler, EventSpec
- from pynecone.var import Var
- # Shorthand for join.
- join = os.linesep.join
- # Console for pretty printing.
- console = Console()
- # Union of generic types.
- GenericType = Union[Type, _GenericAlias]
- # Valid state var types.
- PrimitiveType = Union[int, float, bool, str, list, dict, tuple]
- StateVar = Union[PrimitiveType, Base, None]
- def deprecate(msg: str):
- """Print a deprecation warning.
- Args:
- msg: The deprecation message.
- """
- console.print(f"[yellow]DeprecationWarning: {msg}[/yellow]")
- def get_args(alias: _GenericAlias) -> Tuple[Type, ...]:
- """Get the arguments of a type alias.
- Args:
- alias: The type alias.
- Returns:
- The arguments of the type alias.
- """
- return alias.__args__
- def is_generic_alias(cls: GenericType) -> bool:
- """Check whether the class is a generic alias.
- Args:
- cls: The class to check.
- Returns:
- Whether the class is a generic alias.
- """
- # For older versions of Python.
- if isinstance(cls, _GenericAlias):
- return True
- with contextlib.suppress(ImportError):
- from typing import _SpecialGenericAlias # type: ignore
- if isinstance(cls, _SpecialGenericAlias):
- return True
- # For newer versions of Python.
- try:
- from types import GenericAlias # type: ignore
- return isinstance(cls, GenericAlias)
- except ImportError:
- return False
- def is_union(cls: GenericType) -> bool:
- """Check if a class is a Union.
- Args:
- cls: The class to check.
- Returns:
- Whether the class is a Union.
- """
- with contextlib.suppress(ImportError):
- from typing import _UnionGenericAlias # type: ignore
- return isinstance(cls, _UnionGenericAlias)
- return cls.__origin__ == Union if is_generic_alias(cls) else False
- def get_base_class(cls: GenericType) -> Type:
- """Get the base class of a class.
- Args:
- cls: The class.
- Returns:
- The base class of the class.
- """
- if is_union(cls):
- return tuple(get_base_class(arg) for arg in get_args(cls))
- return get_base_class(cls.__origin__) if is_generic_alias(cls) else cls
- def _issubclass(cls: GenericType, cls_check: GenericType) -> bool:
- """Check if a class is a subclass of another class.
- Args:
- cls: The class to check.
- cls_check: The class to check against.
- Returns:
- Whether the class is a subclass of the other class.
- """
- # Special check for Any.
- if cls_check == Any:
- return True
- if cls in [Any, Callable]:
- return False
- # Get the base classes.
- cls_base = get_base_class(cls)
- cls_check_base = get_base_class(cls_check)
- # The class we're checking should not be a union.
- if isinstance(cls_base, tuple):
- return False
- # Check if the types match.
- return cls_check_base == Any or issubclass(cls_base, cls_check_base)
- def _isinstance(obj: Any, cls: GenericType) -> bool:
- """Check if an object is an instance of a class.
- Args:
- obj: The object to check.
- cls: The class to check against.
- Returns:
- Whether the object is an instance of the class.
- """
- return isinstance(obj, get_base_class(cls))
- def rm(path: str):
- """Remove a file or directory.
- Args:
- path: The path to the file or directory.
- """
- if os.path.isdir(path):
- shutil.rmtree(path)
- elif os.path.isfile(path):
- os.remove(path)
- def cp(src: str, dest: str, overwrite: bool = True) -> bool:
- """Copy a file or directory.
- Args:
- src: The path to the file or directory.
- dest: The path to the destination.
- overwrite: Whether to overwrite the destination.
- Returns:
- Whether the copy was successful.
- """
- if src == dest:
- return False
- if not overwrite and os.path.exists(dest):
- return False
- if os.path.isdir(src):
- rm(dest)
- shutil.copytree(src, dest)
- else:
- shutil.copyfile(src, dest)
- return True
- def mv(src: str, dest: str, overwrite: bool = True) -> bool:
- """Move a file or directory.
- Args:
- src: The path to the file or directory.
- dest: The path to the destination.
- overwrite: Whether to overwrite the destination.
- Returns:
- Whether the move was successful.
- """
- if src == dest:
- return False
- if not overwrite and os.path.exists(dest):
- return False
- rm(dest)
- shutil.move(src, dest)
- return True
- def mkdir(path: str):
- """Create a directory.
- Args:
- path: The path to the directory.
- """
- if not os.path.exists(path):
- os.makedirs(path)
- def ln(src: str, dest: str, overwrite: bool = False) -> bool:
- """Create a symbolic link.
- Args:
- src: The path to the file or directory.
- dest: The path to the destination.
- overwrite: Whether to overwrite the destination.
- Returns:
- Whether the link was successful.
- """
- if src == dest:
- return False
- if not overwrite and (os.path.exists(dest) or os.path.islink(dest)):
- return False
- if os.path.isdir(src):
- rm(dest)
- os.symlink(src, dest, target_is_directory=True)
- else:
- os.symlink(src, dest)
- return True
- def kill(pid):
- """Kill a process.
- Args:
- pid: The process ID.
- """
- os.kill(pid, signal.SIGTERM)
- def which(program: str) -> Optional[str]:
- """Find the path to an executable.
- Args:
- program: The name of the executable.
- Returns:
- The path to the executable.
- """
- return shutil.which(program)
- def get_config() -> Config:
- """Get the app config.
- Returns:
- The app config.
- """
- from pynecone.config import Config
- sys.path.append(os.getcwd())
- try:
- return __import__(constants.CONFIG_MODULE).config
- except ImportError:
- return Config(app_name="") # type: ignore
- def check_node_version(min_version):
- """Check the version of Node.js.
- Args:
- min_version: The minimum version of Node.js required.
- Returns:
- Whether the version of Node.js is high enough.
- """
- try:
- # Run the node -v command and capture the output
- result = subprocess.run(
- ["node", "-v"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
- )
- # The output will be in the form "vX.Y.Z", so we can split it on the "v" character and take the second part
- version = result.stdout.decode().strip().split("v")[1]
- # Compare the version numbers
- return version.split(".") >= min_version.split(".")
- except Exception:
- return False
- def get_package_manager() -> str:
- """Get the package manager executable.
- Returns:
- The path to the package manager.
- Raises:
- FileNotFoundError: If bun or npm is not installed.
- Exit: If the app directory is invalid.
- """
- # Check that the node version is valid.
- if not check_node_version(constants.MIN_NODE_VERSION):
- console.print(
- f"[red]Node.js version {constants.MIN_NODE_VERSION} or higher is required to run Pynecone."
- )
- raise typer.Exit()
- # On Windows, we use npm instead of bun.
- if platform.system() == "Windows":
- npm_path = which("npm")
- if npm_path is None:
- raise FileNotFoundError("Pynecone requires npm to be installed on Windows.")
- return npm_path
- # On other platforms, we use bun.
- return os.path.expandvars(get_config().bun_path)
- def get_app() -> ModuleType:
- """Get the app module based on the default config.
- Returns:
- The app based on the default config.
- """
- config = get_config()
- module = ".".join([config.app_name, config.app_name])
- sys.path.insert(0, os.getcwd())
- return __import__(module, fromlist=(constants.APP_VAR,))
- def create_config(app_name: str):
- """Create a new pcconfig file.
- Args:
- app_name: The name of the app.
- """
- # Import here to avoid circular imports.
- from pynecone.compiler import templates
- with open(constants.CONFIG_FILE, "w") as f:
- f.write(templates.PCCONFIG.format(app_name=app_name))
- def initialize_gitignore():
- """Initialize the template .gitignore file."""
- # The files to add to the .gitignore file.
- files = constants.DEFAULT_GITIGNORE
- # Subtract current ignored files.
- if os.path.exists(constants.GITIGNORE_FILE):
- with open(constants.GITIGNORE_FILE, "r") as f:
- files -= set(f.read().splitlines())
- # Add the new files to the .gitignore file.
- with open(constants.GITIGNORE_FILE, "a") as f:
- f.write(join(files))
- def initialize_app_directory(app_name: str):
- """Initialize the app directory on pc init.
- Args:
- app_name: The name of the app.
- """
- console.log("Initializing the app directory.")
- cp(constants.APP_TEMPLATE_DIR, app_name)
- mv(
- os.path.join(app_name, constants.APP_TEMPLATE_FILE),
- os.path.join(app_name, app_name + constants.PY_EXT),
- )
- cp(constants.ASSETS_TEMPLATE_DIR, constants.APP_ASSETS_DIR)
- def initialize_web_directory():
- """Initialize the web directory on pc init."""
- console.log("Initializing the web directory.")
- rm(os.path.join(constants.WEB_TEMPLATE_DIR, constants.NODE_MODULES))
- rm(os.path.join(constants.WEB_TEMPLATE_DIR, constants.PACKAGE_LOCK))
- cp(constants.WEB_TEMPLATE_DIR, constants.WEB_DIR)
- def install_bun():
- """Install bun onto the user's system.
- Raises:
- FileNotFoundError: If the required packages are not installed.
- """
- # Bun is not supported on Windows.
- if platform.system() == "Windows":
- console.log("Skipping bun installation on Windows.")
- return
- # Only install if bun is not already installed.
- if not os.path.exists(get_package_manager()):
- console.log("Installing bun...")
- # Check if curl is installed
- curl_path = which("curl")
- if curl_path is None:
- raise FileNotFoundError("Pynecone requires curl to be installed.")
- # Check if unzip is installed
- unzip_path = which("unzip")
- if unzip_path is None:
- raise FileNotFoundError("Pynecone requires unzip to be installed.")
- os.system(constants.INSTALL_BUN)
- def install_frontend_packages():
- """Install the frontend packages."""
- # Install the base packages.
- subprocess.run(
- [get_package_manager(), "install"], cwd=constants.WEB_DIR, stdout=PIPE
- )
- # Install the app packages.
- packages = get_config().frontend_packages
- if len(packages) > 0:
- subprocess.run(
- [get_package_manager(), "add", *packages],
- cwd=constants.WEB_DIR,
- stdout=PIPE,
- )
- def is_initialized() -> bool:
- """Check whether the app is initialized.
- Returns:
- Whether the app is initialized in the current directory.
- """
- return os.path.exists(constants.CONFIG_FILE) and os.path.exists(constants.WEB_DIR)
- def is_latest_template() -> bool:
- """Whether the app is using the latest template.
- Returns:
- Whether the app is using the latest template.
- """
- with open(constants.PCVERSION_TEMPLATE_FILE) as f: # type: ignore
- template_version = json.load(f)["version"]
- if not os.path.exists(constants.PCVERSION_APP_FILE):
- return False
- with open(constants.PCVERSION_APP_FILE) as f: # type: ignore
- app_version = json.load(f)["version"]
- return app_version >= template_version
- def set_pynecone_project_hash():
- """Write the hash of the Pynecone project to a PCVERSION_APP_FILE."""
- with open(constants.PCVERSION_APP_FILE) as f: # type: ignore
- pynecone_json = json.load(f)
- pynecone_json["project_hash"] = random.getrandbits(128)
- with open(constants.PCVERSION_APP_FILE, "w") as f:
- json.dump(pynecone_json, f, ensure_ascii=False)
- def generate_sitemap(deploy_url: str):
- """Generate the sitemap config file.
- Args:
- deploy_url: The URL of the deployed app.
- """
- # Import here to avoid circular imports.
- from pynecone.compiler import templates
- config = json.dumps(
- {
- "siteUrl": deploy_url,
- "generateRobotsTxt": True,
- }
- )
- with open(constants.SITEMAP_CONFIG_FILE, "w") as f:
- f.write(templates.SITEMAP_CONFIG(config=config))
- def export_app(
- app: App,
- backend: bool = True,
- frontend: bool = True,
- zip: bool = False,
- deploy_url: Optional[str] = None,
- ):
- """Zip up the app for deployment.
- Args:
- app: The app.
- backend: Whether to zip up the backend app.
- frontend: Whether to zip up the frontend app.
- zip: Whether to zip the app.
- deploy_url: The URL of the deployed app.
- """
- # Force compile the app.
- app.compile(force_compile=True)
- # Remove the static folder.
- rm(constants.WEB_STATIC_DIR)
- # Generate the sitemap file.
- if deploy_url is not None:
- generate_sitemap(deploy_url)
- # Export the Next app.
- subprocess.run([get_package_manager(), "run", "export"], cwd=constants.WEB_DIR)
- # Zip up the app.
- if zip:
- if os.name == "posix":
- posix_export(backend, frontend)
- if os.name == "nt":
- nt_export(backend, frontend)
- def nt_export(backend: bool = True, frontend: bool = True):
- """Export for nt (Windows) systems.
- Args:
- backend: Whether to zip up the backend app.
- frontend: Whether to zip up the frontend app.
- """
- cmd = r""
- if frontend:
- cmd = r'''powershell -Command "Set-Location .web/_static; Compress-Archive -Path .\* -DestinationPath ..\..\frontend.zip -Force"'''
- os.system(cmd)
- if backend:
- cmd = r'''powershell -Command "Get-ChildItem -File | Where-Object { $_.Name -notin @('.web', 'assets', 'frontend.zip', 'backend.zip') } | Compress-Archive -DestinationPath backend.zip -Update"'''
- os.system(cmd)
- def posix_export(backend: bool = True, frontend: bool = True):
- """Export for posix (Linux, OSX) systems.
- Args:
- backend: Whether to zip up the backend app.
- frontend: Whether to zip up the frontend app.
- """
- cmd = r""
- if frontend:
- cmd = r"cd .web/_static && zip -r ../../frontend.zip ./*"
- os.system(cmd)
- if backend:
- cmd = r"zip -r backend.zip ./* -x .web/\* ./assets\* ./frontend.zip\* ./backend.zip\*"
- os.system(cmd)
- def setup_frontend(root: Path):
- """Set up the frontend.
- Args:
- root: root path of the project.
- """
- # Initialize the web directory if it doesn't exist.
- cp(constants.WEB_TEMPLATE_DIR, str(root / constants.WEB_DIR), overwrite=False)
- # Install the frontend packages.
- console.rule("[bold]Installing frontend packages")
- install_frontend_packages()
- # copy asset files to public folder
- mkdir(str(root / constants.WEB_ASSETS_DIR))
- cp(
- src=str(root / constants.APP_ASSETS_DIR),
- dest=str(root / constants.WEB_ASSETS_DIR),
- )
- def run_frontend(app: App, root: Path, port: str):
- """Run the frontend.
- Args:
- app: The app.
- root: root path of the project.
- port: port of the app.
- """
- # Set up the frontend.
- setup_frontend(root)
- # Compile the frontend.
- app.compile(force_compile=True)
- # Run the frontend in development mode.
- console.rule("[bold green]App Running")
- os.environ["PORT"] = get_config().port if port is None else port
- subprocess.Popen(
- [get_package_manager(), "run", "next", "telemetry", "disable"],
- cwd=constants.WEB_DIR,
- env=os.environ,
- stdout=DEVNULL,
- stderr=STDOUT,
- )
- subprocess.Popen(
- [get_package_manager(), "run", "dev"], cwd=constants.WEB_DIR, env=os.environ
- )
- def run_frontend_prod(app: App, root: Path, port: str):
- """Run the frontend.
- Args:
- app: The app.
- root: root path of the project.
- port: port of the app.
- """
- # Set up the frontend.
- setup_frontend(root)
- # Export the app.
- export_app(app)
- os.environ["PORT"] = get_config().port if port is None else port
- # Run the frontend in production mode.
- subprocess.Popen(
- [get_package_manager(), "run", "prod"], cwd=constants.WEB_DIR, env=os.environ
- )
- def get_num_workers() -> int:
- """Get the number of backend worker processes.
- Returns:
- The number of backend worker processes.
- """
- return 1 if get_redis() is None else (os.cpu_count() or 1) * 2 + 1
- def get_api_port() -> int:
- """Get the API port.
- Returns:
- The API port.
- """
- port = urlparse(get_config().api_url).port
- if port is None:
- port = urlparse(constants.API_URL).port
- assert port is not None
- return port
- def get_process_on_port(port) -> Optional[psutil.Process]:
- """Get the process on the given port.
- Args:
- port: The port.
- Returns:
- The process on the given port.
- """
- for proc in psutil.process_iter(["pid", "name", "cmdline"]):
- try:
- for conns in proc.connections(kind="inet"):
- if conns.laddr.port == int(port):
- return proc
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- pass
- return None
- def is_process_on_port(port) -> bool:
- """Check if a process is running on the given port.
- Args:
- port: The port.
- Returns:
- Whether a process is running on the given port.
- """
- return get_process_on_port(port) is not None
- def kill_process_on_port(port):
- """Kill the process on the given port.
- Args:
- port: The port.
- """
- if get_process_on_port(port) is not None:
- with contextlib.suppress(psutil.AccessDenied):
- get_process_on_port(port).kill() # type: ignore
- def change_or_terminate_port(port, _type) -> str:
- """Terminate or change the port.
- Args:
- port: The port.
- _type: The type of the port.
- Returns:
- The new port or the current one.
- """
- console.print(
- f"Something is already running on port [bold underline]{port}[/bold underline]. This is the port the {_type} runs on."
- )
- frontend_action = Prompt.ask("Kill or change it?", choices=["k", "c", "n"])
- if frontend_action == "k":
- kill_process_on_port(port)
- return port
- elif frontend_action == "c":
- new_port = Prompt.ask("Specify the new port")
- # Check if also the new port is used
- if is_process_on_port(new_port):
- return change_or_terminate_port(new_port, _type)
- else:
- console.print(
- f"The {_type} will run on port [bold underline]{new_port}[/bold underline]."
- )
- return new_port
- else:
- console.print("Exiting...")
- sys.exit()
- def setup_backend():
- """Set up backend.
- Specifically ensures backend database is updated when running --no-frontend.
- """
- # Import here to avoid circular imports.
- from pynecone.model import Model
- config = get_config()
- if config.db_url is not None:
- Model.create_all()
- def run_backend(
- app_name: str, port: int, loglevel: constants.LogLevel = constants.LogLevel.ERROR
- ):
- """Run the backend.
- Args:
- app_name: The app name.
- port: The app port
- loglevel: The log level.
- """
- setup_backend()
- uvicorn.run(
- f"{app_name}:{constants.APP_VAR}.{constants.API_VAR}",
- host=constants.BACKEND_HOST,
- port=port,
- log_level=loglevel,
- reload=True,
- )
- def run_backend_prod(
- app_name: str, port: int, loglevel: constants.LogLevel = constants.LogLevel.ERROR
- ):
- """Run the backend.
- Args:
- app_name: The app name.
- port: The app port
- loglevel: The log level.
- """
- setup_backend()
- num_workers = get_num_workers()
- command = (
- [
- *constants.RUN_BACKEND_PROD_WINDOWS,
- "--host",
- "0.0.0.0",
- "--port",
- str(port),
- f"{app_name}:{constants.APP_VAR}",
- ]
- if platform.system() == "Windows"
- else [
- *constants.RUN_BACKEND_PROD,
- "--bind",
- f"0.0.0.0:{port}",
- "--threads",
- str(num_workers),
- f"{app_name}:{constants.APP_VAR}()",
- ]
- )
- command += [
- "--log-level",
- loglevel.value,
- "--workers",
- str(num_workers),
- ]
- subprocess.run(command)
- def get_production_backend_url() -> str:
- """Get the production backend URL.
- Returns:
- The production backend URL.
- """
- config = get_config()
- return constants.PRODUCTION_BACKEND_URL.format(
- username=config.username,
- app_name=config.app_name,
- )
- def to_snake_case(text: str) -> str:
- """Convert a string to snake case.
- The words in the text are converted to lowercase and
- separated by underscores.
- Args:
- text: The string to convert.
- Returns:
- The snake case string.
- """
- s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text)
- return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
- def to_camel_case(text: str) -> str:
- """Convert a string to camel case.
- The first word in the text is converted to lowercase and
- the rest of the words are converted to title case, removing underscores.
- Args:
- text: The string to convert.
- Returns:
- The camel case string.
- """
- if "_" not in text:
- return text
- camel = "".join(
- word.capitalize() if i > 0 else word.lower()
- for i, word in enumerate(text.lstrip("_").split("_"))
- )
- prefix = "_" if text.startswith("_") else ""
- return prefix + camel
- def to_title_case(text: str) -> str:
- """Convert a string from snake case to title case.
- Args:
- text: The string to convert.
- Returns:
- The title case string.
- """
- return "".join(word.capitalize() for word in text.split("_"))
- WRAP_MAP = {
- "{": "}",
- "(": ")",
- "[": "]",
- "<": ">",
- '"': '"',
- "'": "'",
- "`": "`",
- }
- def get_close_char(open: str, close: Optional[str] = None) -> str:
- """Check if the given character is a valid brace.
- Args:
- open: The open character.
- close: The close character if provided.
- Returns:
- The close character.
- Raises:
- ValueError: If the open character is not a valid brace.
- """
- if close is not None:
- return close
- if open not in WRAP_MAP:
- raise ValueError(f"Invalid wrap open: {open}, must be one of {WRAP_MAP.keys()}")
- return WRAP_MAP[open]
- def is_wrapped(text: str, open: str, close: Optional[str] = None) -> bool:
- """Check if the given text is wrapped in the given open and close characters.
- Args:
- text: The text to check.
- open: The open character.
- close: The close character.
- Returns:
- Whether the text is wrapped.
- """
- close = get_close_char(open, close)
- return text.startswith(open) and text.endswith(close)
- def wrap(
- text: str,
- open: str,
- close: Optional[str] = None,
- check_first: bool = True,
- num: int = 1,
- ) -> str:
- """Wrap the given text in the given open and close characters.
- Args:
- text: The text to wrap.
- open: The open character.
- close: The close character.
- check_first: Whether to check if the text is already wrapped.
- num: The number of times to wrap the text.
- Returns:
- The wrapped text.
- """
- close = get_close_char(open, close)
- # If desired, check if the text is already wrapped in braces.
- if check_first and is_wrapped(text=text, open=open, close=close):
- return text
- # Wrap the text in braces.
- return f"{open * num}{text}{close * num}"
- def indent(text: str, indent_level: int = 2) -> str:
- """Indent the given text by the given indent level.
- Args:
- text: The text to indent.
- indent_level: The indent level.
- Returns:
- The indented text.
- """
- lines = text.splitlines()
- if len(lines) < 2:
- return text
- return os.linesep.join(f"{' ' * indent_level}{line}" for line in lines) + os.linesep
- def verify_route_validity(route: str) -> None:
- """Verify if the route is valid, and throw an error if not.
- Args:
- route: The route that need to be checked
- Raises:
- ValueError: If the route is invalid.
- """
- pattern = catchall_in_route(route)
- if pattern and not route.endswith(pattern):
- raise ValueError(f"Catch-all must be the last part of the URL: {route}")
- def get_route_args(route: str) -> Dict[str, str]:
- """Get the dynamic arguments for the given route.
- Args:
- route: The route to get the arguments for.
- Returns:
- The route arguments.
- """
- args = {}
- def add_route_arg(match: re.Match[str], type_: str):
- """Add arg from regex search result.
- Args:
- match: Result of a regex search
- type_: The assigned type for this arg
- Raises:
- ValueError: If the route is invalid.
- """
- arg_name = match.groups()[0]
- if arg_name in args:
- raise ValueError(
- f"Arg name [{arg_name}] is used more than once in this URL"
- )
- args[arg_name] = type_
- # Regex to check for route args.
- check = constants.RouteRegex.ARG
- check_strict_catchall = constants.RouteRegex.STRICT_CATCHALL
- check_opt_catchall = constants.RouteRegex.OPT_CATCHALL
- # Iterate over the route parts and check for route args.
- for part in route.split("/"):
- match_opt = check_opt_catchall.match(part)
- if match_opt:
- add_route_arg(match_opt, constants.RouteArgType.LIST)
- break
- match_strict = check_strict_catchall.match(part)
- if match_strict:
- add_route_arg(match_strict, constants.RouteArgType.LIST)
- break
- match = check.match(part)
- if match:
- # Add the route arg to the list.
- add_route_arg(match, constants.RouteArgType.SINGLE)
- return args
- def catchall_in_route(route: str) -> str:
- """Extract the catchall part from a route.
- Args:
- route: the route from which to extract
- Returns:
- str: the catchall part of the URI
- """
- match_ = constants.RouteRegex.CATCHALL.search(route)
- return match_.group() if match_ else ""
- def catchall_prefix(route: str) -> str:
- """Extract the prefix part from a route that contains a catchall.
- Args:
- route: the route from which to extract
- Returns:
- str: the prefix part of the URI
- """
- pattern = catchall_in_route(route)
- return route.replace(pattern, "") if pattern else ""
- def format_route(route: str) -> str:
- """Format the given route.
- Args:
- route: The route to format.
- Returns:
- The formatted route.
- """
- # Strip the route.
- route = route.strip("/")
- route = to_snake_case(route).replace("_", "-")
- # If the route is empty, return the index route.
- if route == "":
- return constants.INDEX_ROUTE
- return route
- def format_cond(
- cond: str,
- true_value: str,
- false_value: str = '""',
- is_prop=False,
- ) -> str:
- """Format a conditional expression.
- Args:
- cond: The cond.
- true_value: The value to return if the cond is true.
- false_value: The value to return if the cond is false.
- is_prop: Whether the cond is a prop
- Returns:
- The formatted conditional expression.
- """
- # Import here to avoid circular imports.
- from pynecone.var import Var
- # Format prop conds.
- if is_prop:
- prop1 = Var.create(true_value, is_string=type(true_value) == str)
- prop2 = Var.create(false_value, is_string=type(false_value) == str)
- assert prop1 is not None and prop2 is not None, "Invalid prop values"
- return f"{cond} ? {prop1} : {prop2}".replace("{", "").replace("}", "")
- # Format component conds.
- return wrap(f"{cond} ? {true_value} : {false_value}", "{")
- def get_event_handler_parts(handler: EventHandler) -> Tuple[str, str]:
- """Get the state and function name of an event handler.
- Args:
- handler: The event handler to get the parts of.
- Returns:
- The state and function name.
- """
- # Get the class that defines the event handler.
- parts = handler.fn.__qualname__.split(".")
- # If there's no enclosing class, just return the function name.
- if len(parts) == 1:
- return ("", parts[-1])
- # Get the state and the function name.
- state_name, name = parts[-2:]
- # Construct the full event handler name.
- try:
- # Try to get the state from the module.
- state = vars(sys.modules[handler.fn.__module__])[state_name]
- except Exception:
- # If the state isn't in the module, just return the function name.
- return ("", handler.fn.__qualname__)
- return (state.get_full_name(), name)
- def format_event_handler(handler: EventHandler) -> str:
- """Format an event handler.
- Args:
- handler: The event handler to format.
- Returns:
- The formatted function.
- """
- state, name = get_event_handler_parts(handler)
- if state == "":
- return name
- return f"{state}.{name}"
- def format_event(event_spec: EventSpec) -> str:
- """Format an event.
- Args:
- event_spec: The event to format.
- Returns:
- The compiled event.
- """
- args = ",".join([":".join((name, val)) for name, val in event_spec.args])
- return f"E(\"{format_event_handler(event_spec.handler)}\", {wrap(args, '{')})"
- def format_upload_event(event_spec: EventSpec) -> str:
- """Format an upload event.
- Args:
- event_spec: The event to format.
- Returns:
- The compiled event.
- """
- from pynecone.compiler import templates
- state, name = get_event_handler_parts(event_spec.handler)
- return f'uploadFiles({state}, {templates.RESULT}, {templates.SET_RESULT}, {state}.files, "{name}", UPLOAD)'
- def format_query_params(router_data: Dict[str, Any]) -> Dict[str, str]:
- """Convert back query params name to python-friendly case.
- Args:
- router_data: the router_data dict containing the query params
- Returns:
- The reformatted query params
- """
- params = router_data[constants.RouteVar.QUERY]
- return {k.replace("-", "_"): v for k, v in params.items()}
- # Set of unique variable names.
- USED_VARIABLES = set()
- def get_unique_variable_name() -> str:
- """Get a unique variable name.
- Returns:
- The unique variable name.
- """
- name = "".join([random.choice(string.ascii_lowercase) for _ in range(8)])
- if name not in USED_VARIABLES:
- USED_VARIABLES.add(name)
- return name
- return get_unique_variable_name()
- def get_default_app_name() -> str:
- """Get the default app name.
- The default app name is the name of the current directory.
- Returns:
- The default app name.
- """
- return os.getcwd().split(os.path.sep)[-1].replace("-", "_")
- def is_dataframe(value: Type) -> bool:
- """Check if the given value is a dataframe.
- Args:
- value: The value to check.
- Returns:
- Whether the value is a dataframe.
- """
- return value.__name__ == "DataFrame"
- def is_figure(value: Type) -> bool:
- """Check if the given value is a figure.
- Args:
- value: The value to check.
- Returns:
- Whether the value is a figure.
- """
- return value.__name__ == "Figure"
- def is_valid_var_type(var: Type) -> bool:
- """Check if the given value is a valid prop type.
- Args:
- var: The value to check.
- Returns:
- Whether the value is a valid prop type.
- """
- return _issubclass(var, StateVar) or is_dataframe(var) or is_figure(var)
- def format_dataframe_values(value: Type) -> List[Any]:
- """Format dataframe values.
- Args:
- value: The value to format.
- Returns:
- Format data
- """
- if not is_dataframe(type(value)):
- return value
- format_data = []
- for data in list(value.values.tolist()):
- element = []
- for d in data:
- element.append(str(d) if isinstance(d, (list, tuple)) else d)
- format_data.append(element)
- return format_data
- def format_state(value: Any) -> Dict:
- """Recursively format values in the given state.
- Args:
- value: The state to format.
- Returns:
- The formatted state.
- Raises:
- TypeError: If the given value is not a valid state.
- """
- # Handle dicts.
- if isinstance(value, dict):
- return {k: format_state(v) for k, v in value.items()}
- # Return state vars as is.
- if isinstance(value, StateBases):
- return value
- # Convert plotly figures to JSON.
- if isinstance(value, go.Figure):
- return json.loads(to_json(value))["data"] # type: ignore
- # Convert pandas dataframes to JSON.
- if is_dataframe(type(value)):
- return {
- "columns": value.columns.tolist(),
- "data": format_dataframe_values(value),
- }
- raise TypeError(
- "State vars must be primitive Python types, "
- "or subclasses of pc.Base. "
- f"Got var of type {type(value)}."
- )
- def get_event(state, event):
- """Get the event from the given state.
- Args:
- state: The state.
- event: The event.
- Returns:
- The event.
- """
- return f"{state.get_name()}.{event}"
- def format_string(string: str) -> str:
- """Format the given string as a JS string literal..
- Args:
- string: The string to format.
- Returns:
- The formatted string.
- """
- # Escape backticks.
- string = string.replace(r"\`", "`")
- string = string.replace("`", r"\`")
- # Wrap the string so it looks like {`string`}.
- string = wrap(string, "`")
- string = wrap(string, "{")
- return string
- def call_event_handler(event_handler: EventHandler, arg: Var) -> EventSpec:
- """Call an event handler to get the event spec.
- This function will inspect the function signature of the event handler.
- If it takes in an arg, the arg will be passed to the event handler.
- Otherwise, the event handler will be called with no args.
- Args:
- event_handler: The event handler.
- arg: The argument to pass to the event handler.
- Returns:
- The event spec from calling the event handler.
- """
- args = inspect.getfullargspec(event_handler.fn).args
- if len(args) == 1:
- return event_handler()
- assert (
- len(args) == 2
- ), f"Event handler {event_handler.fn} must have 1 or 2 arguments."
- return event_handler(arg)
- def call_event_fn(fn: Callable, arg: Var) -> List[EventSpec]:
- """Call a function to a list of event specs.
- The function should return either a single EventSpec or a list of EventSpecs.
- If the function takes in an arg, the arg will be passed to the function.
- Otherwise, the function will be called with no args.
- Args:
- fn: The function to call.
- arg: The argument to pass to the function.
- Returns:
- The event specs from calling the function.
- Raises:
- ValueError: If the lambda has an invalid signature.
- """
- # Import here to avoid circular imports.
- from pynecone.event import EventHandler, EventSpec
- # Get the args of the lambda.
- args = inspect.getfullargspec(fn).args
- # Call the lambda.
- if len(args) == 0:
- out = fn()
- elif len(args) == 1:
- out = fn(arg)
- else:
- raise ValueError(f"Lambda {fn} must have 0 or 1 arguments.")
- # Convert the output to a list.
- if not isinstance(out, List):
- out = [out]
- # Convert any event specs to event specs.
- events = []
- for e in out:
- # Convert handlers to event specs.
- if isinstance(e, EventHandler):
- if len(args) == 0:
- e = e()
- elif len(args) == 1:
- e = e(arg)
- # Make sure the event spec is valid.
- if not isinstance(e, EventSpec):
- raise ValueError(f"Lambda {fn} returned an invalid event spec: {e}.")
- # Add the event spec to the chain.
- events.append(e)
- # Return the events.
- return events
- def get_handler_args(event_spec: EventSpec, arg: Var) -> Tuple[Tuple[str, str], ...]:
- """Get the handler args for the given event spec.
- Args:
- event_spec: The event spec.
- arg: The controlled event argument.
- Returns:
- The handler args.
- Raises:
- ValueError: If the event handler has an invalid signature.
- """
- args = inspect.getfullargspec(event_spec.handler.fn).args
- if len(args) < 2:
- raise ValueError(
- f"Event handler has an invalid signature, needed a method with a parameter, got {event_spec.handler}."
- )
- return event_spec.args if len(args) > 2 else ((args[1], arg.name),)
- def fix_events(
- events: Optional[List[Union[EventHandler, EventSpec]]], token: str
- ) -> List[Event]:
- """Fix a list of events returned by an event handler.
- Args:
- events: The events to fix.
- token: The user token.
- Returns:
- The fixed events.
- """
- from pynecone.event import Event, EventHandler, EventSpec
- # If the event handler returns nothing, return an empty list.
- if events is None:
- return []
- # If the handler returns a single event, wrap it in a list.
- if not isinstance(events, List):
- events = [events]
- # Fix the events created by the handler.
- out = []
- for e in events:
- # Otherwise, create an event from the event spec.
- if isinstance(e, EventHandler):
- e = e()
- assert isinstance(e, EventSpec), f"Unexpected event type, {type(e)}."
- name = format_event_handler(e.handler)
- payload = dict(e.args)
- # Create an event and append it to the list.
- out.append(
- Event(
- token=token,
- name=name,
- payload=payload,
- )
- )
- return out
- def merge_imports(*imports) -> ImportDict:
- """Merge two import dicts together.
- Args:
- *imports: The list of import dicts to merge.
- Returns:
- The merged import dicts.
- """
- all_imports = defaultdict(set)
- for import_dict in imports:
- for lib, fields in import_dict.items():
- for field in fields:
- all_imports[lib].add(field)
- return all_imports
- def get_hydrate_event(state) -> str:
- """Get the name of the hydrate event for the state.
- Args:
- state: The state.
- Returns:
- The name of the hydrate event.
- """
- return get_event(state, constants.HYDRATE)
- def get_redis() -> Optional[Redis]:
- """Get the redis client.
- Returns:
- The redis client.
- """
- config = get_config()
- if config.redis_url is None:
- return None
- redis_url, redis_port = config.redis_url.split(":")
- print("Using redis at", config.redis_url)
- return Redis(host=redis_url, port=int(redis_port), db=0)
- def is_backend_variable(name: str) -> bool:
- """Check if this variable name correspond to a backend variable.
- Args:
- name: The name of the variable to check
- Returns:
- bool: The result of the check
- """
- return name.startswith("_") and not name.startswith("__")
- def json_dumps(obj: Any):
- """Serialize ``obj`` to a JSON formatted ``str``, ensure_ascii=False.
- Args:
- obj: The obj to be fromatted
- Returns:
- str: The result of the json dumps
- """
- return json.dumps(obj, ensure_ascii=False)
- # Store this here for performance.
- StateBases = get_base_class(StateVar)
|