123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- """Process operations."""
- from __future__ import annotations
- import collections
- import contextlib
- import os
- import signal
- import subprocess
- from concurrent import futures
- from typing import Callable, Generator, List, Optional, Tuple, Union
- import psutil
- import typer
- from reflex.utils import console, path_ops, prerequisites
- def kill(pid):
- """Kill a process.
- Args:
- pid: The process ID.
- """
- os.kill(pid, signal.SIGTERM)
- def get_num_workers() -> int:
- """Get the number of backend worker processes.
- Returns:
- The number of backend worker processes.
- """
- return 1 if prerequisites.get_redis() is None else (os.cpu_count() or 1) * 2 + 1
- def get_process_on_port(port) -> Optional[psutil.Process]:
- """Get the process on the given port.
- Args:
- port: The port.
- Returns:
- The process on the given port.
- """
- for proc in psutil.process_iter(["pid", "name", "cmdline"]):
- try:
- for conns in proc.connections(kind="inet"):
- if conns.laddr.port == int(port):
- return proc
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- pass
- return None
- def is_process_on_port(port) -> bool:
- """Check if a process is running on the given port.
- Args:
- port: The port.
- Returns:
- Whether a process is running on the given port.
- """
- return get_process_on_port(port) is not None
- def kill_process_on_port(port):
- """Kill the process on the given port.
- Args:
- port: The port.
- """
- if get_process_on_port(port) is not None:
- with contextlib.suppress(psutil.AccessDenied):
- get_process_on_port(port).kill() # type: ignore
- def change_or_terminate_port(port, _type) -> str:
- """Terminate or change the port.
- Args:
- port: The port.
- _type: The type of the port.
- Returns:
- The new port or the current one.
- Raises:
- Exit: If the user wants to exit.
- """
- console.info(
- f"Something is already running on port [bold underline]{port}[/bold underline]. This is the port the {_type} runs on."
- )
- frontend_action = console.ask("Kill or change it?", choices=["k", "c", "n"])
- if frontend_action == "k":
- kill_process_on_port(port)
- return port
- elif frontend_action == "c":
- new_port = console.ask("Specify the new port")
- # Check if also the new port is used
- if is_process_on_port(new_port):
- return change_or_terminate_port(new_port, _type)
- else:
- console.info(
- f"The {_type} will run on port [bold underline]{new_port}[/bold underline]."
- )
- return new_port
- else:
- console.log("Exiting...")
- raise typer.Exit()
- def new_process(args, run: bool = False, show_logs: bool = False, **kwargs):
- """Wrapper over subprocess.Popen to unify the launch of child processes.
- Args:
- args: A string, or a sequence of program arguments.
- run: Whether to run the process to completion.
- show_logs: Whether to show the logs of the process.
- **kwargs: Kwargs to override default wrap values to pass to subprocess.Popen as arguments.
- Returns:
- Execute a child program in a new process.
- """
- node_bin_path = path_ops.get_node_bin_path()
- if not node_bin_path:
- console.warn(
- "The path to the Node binary could not be found. Please ensure that Node is properly "
- "installed and added to your system's PATH environment variable."
- )
- # Add the node bin path to the PATH environment variable.
- env = {
- **os.environ,
- "PATH": os.pathsep.join([node_bin_path if node_bin_path else "", os.environ["PATH"]]), # type: ignore
- **kwargs.pop("env", {}),
- }
- kwargs = {
- "env": env,
- "stderr": None if show_logs else subprocess.STDOUT,
- "stdout": None if show_logs else subprocess.PIPE,
- "universal_newlines": True,
- "encoding": "UTF-8",
- **kwargs,
- }
- console.debug(f"Running command: {args}")
- fn = subprocess.run if run else subprocess.Popen
- return fn(args, **kwargs)
- @contextlib.contextmanager
- def run_concurrently_context(
- *fns: Union[Callable, Tuple]
- ) -> Generator[list[futures.Future], None, None]:
- """Run functions concurrently in a thread pool.
- Args:
- *fns: The functions to run.
- Yields:
- The futures for the functions.
- """
- # If no functions are provided, yield an empty list and return.
- if not fns:
- yield []
- return
- # Convert the functions to tuples.
- fns = [fn if isinstance(fn, tuple) else (fn,) for fn in fns] # type: ignore
- # Run the functions concurrently.
- with futures.ThreadPoolExecutor(max_workers=len(fns)) as executor:
- # Submit the tasks.
- tasks = [executor.submit(*fn) for fn in fns] # type: ignore
- # Yield control back to the main thread while tasks are running.
- yield tasks
- # Get the results in the order completed to check any exceptions.
- for task in futures.as_completed(tasks):
- task.result()
- def run_concurrently(*fns: Union[Callable, Tuple]) -> None:
- """Run functions concurrently in a thread pool.
- Args:
- *fns: The functions to run.
- """
- with run_concurrently_context(*fns):
- pass
- def stream_logs(message: str, process: subprocess.Popen, progress=None):
- """Stream the logs for a process.
- Args:
- message: The message to display.
- process: The process.
- progress: The ongoing progress bar if one is being used.
- Yields:
- The lines of the process output.
- Raises:
- Exit: If the process failed.
- """
- # Store the tail of the logs.
- logs = collections.deque(maxlen=512)
- with process:
- console.debug(message, progress=progress)
- if process.stdout is None:
- return
- for line in process.stdout:
- console.debug(line, end="", progress=progress)
- logs.append(line)
- yield line
- # Check if the process failed (not printing the logs for SIGINT).
- if process.returncode not in [0, -2]:
- console.error(f"{message} failed with exit code {process.returncode}")
- for line in logs:
- console.error(line, end="")
- console.error("Run with [bold]--loglevel debug [/bold] for the full log.")
- raise typer.Exit(1)
- def show_logs(message: str, process: subprocess.Popen):
- """Show the logs for a process.
- Args:
- message: The message to display.
- process: The process.
- """
- for _ in stream_logs(message, process):
- pass
- def show_status(message: str, process: subprocess.Popen):
- """Show the status of a process.
- Args:
- message: The initial message to display.
- process: The process.
- """
- with console.status(message) as status:
- for line in stream_logs(message, process):
- status.update(f"{message} {line}")
- def show_progress(message: str, process: subprocess.Popen, checkpoints: List[str]):
- """Show a progress bar for a process.
- Args:
- message: The message to display.
- process: The process.
- checkpoints: The checkpoints to advance the progress bar.
- """
- # Iterate over the process output.
- with console.progress() as progress:
- task = progress.add_task(f"{message}: ", total=len(checkpoints))
- for line in stream_logs(message, process, progress=progress):
- # Check for special strings and update the progress bar.
- for special_string in checkpoints:
- if special_string in line:
- progress.update(task, advance=1)
- if special_string == checkpoints[-1]:
- progress.update(task, completed=len(checkpoints))
- break
- def atexit_handler():
- """Display a custom message with the current time when exiting an app."""
- console.log("Reflex app stopped.")
|