processes.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. """Process operations."""
  2. from __future__ import annotations
  3. import collections
  4. import contextlib
  5. import importlib.metadata
  6. import os
  7. import signal
  8. import subprocess
  9. from concurrent import futures
  10. from pathlib import Path
  11. from typing import Any, Callable, Generator, Literal, Sequence, Tuple, overload
  12. import psutil
  13. import typer
  14. from redis.exceptions import RedisError
  15. from reflex import constants
  16. from reflex.config import environment
  17. from reflex.utils import console, path_ops, prerequisites
  18. from reflex.utils.printer import (
  19. CounterComponent,
  20. FunGuyProgressComponent,
  21. MessageComponent,
  22. ProgressBar,
  23. )
  24. from reflex.utils.terminal import colored
  25. def kill(pid: int):
  26. """Kill a process.
  27. Args:
  28. pid: The process ID.
  29. """
  30. os.kill(pid, signal.SIGTERM)
  31. def get_num_workers() -> int:
  32. """Get the number of backend worker processes.
  33. Raises:
  34. Exit: If unable to connect to Redis.
  35. Returns:
  36. The number of backend worker processes.
  37. """
  38. if (redis_client := prerequisites.get_redis_sync()) is None:
  39. return 1
  40. try:
  41. redis_client.ping()
  42. except RedisError as re:
  43. console.error(f"Unable to connect to Redis: {re}")
  44. raise typer.Exit(1) from re
  45. return (os.cpu_count() or 1) * 2 + 1
  46. def get_process_on_port(port: int) -> psutil.Process | None:
  47. """Get the process on the given port.
  48. Args:
  49. port: The port.
  50. Returns:
  51. The process on the given port.
  52. """
  53. for proc in psutil.process_iter(["pid", "name", "cmdline"]):
  54. with contextlib.suppress(
  55. psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess
  56. ):
  57. if importlib.metadata.version("psutil") >= "6.0.0":
  58. conns = proc.net_connections(kind="inet")
  59. else:
  60. conns = proc.connections(kind="inet")
  61. for conn in conns:
  62. if conn.laddr.port == int(port):
  63. return proc
  64. return None
  65. def is_process_on_port(port: int) -> bool:
  66. """Check if a process is running on the given port.
  67. Args:
  68. port: The port.
  69. Returns:
  70. Whether a process is running on the given port.
  71. """
  72. return get_process_on_port(port) is not None
  73. def kill_process_on_port(port: int):
  74. """Kill the process on the given port.
  75. Args:
  76. port: The port.
  77. """
  78. if get_process_on_port(port) is not None:
  79. with contextlib.suppress(psutil.AccessDenied):
  80. get_process_on_port(port).kill() # pyright: ignore [reportOptionalMemberAccess]
  81. def change_port(port: int, _type: str) -> int:
  82. """Change the port.
  83. Args:
  84. port: The port.
  85. _type: The type of the port.
  86. Returns:
  87. The new port.
  88. """
  89. new_port = port + 1
  90. if is_process_on_port(new_port):
  91. return change_port(new_port, _type)
  92. console.info(
  93. f"The {_type} will run on port "
  94. + colored(port, attrs=("bold", "underline"))
  95. + "."
  96. )
  97. return new_port
  98. def handle_port(service_name: str, port: int, auto_increment: bool) -> int:
  99. """Change port if the specified port is in use and is not explicitly specified as a CLI arg or config arg.
  100. Otherwise tell the user the port is in use and exit the app.
  101. Args:
  102. service_name: The frontend or backend.
  103. port: The provided port.
  104. auto_increment: Whether to automatically increment the port.
  105. Returns:
  106. The port to run the service on.
  107. Raises:
  108. Exit:when the port is in use.
  109. """
  110. if (process := get_process_on_port(port)) is None:
  111. return port
  112. if auto_increment:
  113. return change_port(port, service_name)
  114. else:
  115. console.error(
  116. f"{service_name.capitalize()} port: {port} is already in use by PID: {process.pid}."
  117. )
  118. raise typer.Exit()
  119. @overload
  120. def new_process(
  121. args: str | list[str] | list[str | None] | list[str | Path | None],
  122. run: Literal[False] = False,
  123. show_logs: bool = False,
  124. **kwargs,
  125. ) -> subprocess.Popen[str]: ...
  126. @overload
  127. def new_process(
  128. args: str | list[str] | list[str | None] | list[str | Path | None],
  129. run: Literal[True],
  130. show_logs: bool = False,
  131. **kwargs,
  132. ) -> subprocess.CompletedProcess[str]: ...
  133. def new_process(
  134. args: str | list[str] | list[str | None] | list[str | Path | None],
  135. run: bool = False,
  136. show_logs: bool = False,
  137. **kwargs,
  138. ) -> subprocess.CompletedProcess[str] | subprocess.Popen[str]:
  139. """Wrapper over subprocess.Popen to unify the launch of child processes.
  140. Args:
  141. args: A string, or a sequence of program arguments.
  142. run: Whether to run the process to completion.
  143. show_logs: Whether to show the logs of the process.
  144. **kwargs: Kwargs to override default wrap values to pass to subprocess.Popen as arguments.
  145. Returns:
  146. Execute a child program in a new process.
  147. Raises:
  148. Exit: When attempting to run a command with a None value.
  149. """
  150. # Check for invalid command first.
  151. non_empty_args = list(filter(None, args)) if isinstance(args, list) else [args]
  152. if isinstance(args, list) and len(non_empty_args) != len(args):
  153. console.error(f"Invalid command: {args}")
  154. raise typer.Exit(1)
  155. path_env: str = os.environ.get("PATH", "")
  156. # Add node_bin_path to the PATH environment variable.
  157. if not environment.REFLEX_BACKEND_ONLY.get():
  158. node_bin_path = path_ops.get_node_bin_path()
  159. if node_bin_path:
  160. path_env = os.pathsep.join([str(node_bin_path), path_env])
  161. env: dict[str, str] = {
  162. **os.environ,
  163. "PATH": path_env,
  164. **kwargs.pop("env", {}),
  165. }
  166. kwargs = {
  167. "env": env,
  168. "stderr": None if show_logs else subprocess.STDOUT,
  169. "stdout": None if show_logs else subprocess.PIPE,
  170. "universal_newlines": True,
  171. "encoding": "UTF-8",
  172. "errors": "replace", # Avoid UnicodeDecodeError in unknown command output
  173. **kwargs,
  174. }
  175. console.debug(f"Running command: {non_empty_args}")
  176. def subprocess_p_open(args: subprocess._CMD, **kwargs):
  177. return subprocess.Popen(args, **kwargs)
  178. fn: Callable[..., subprocess.CompletedProcess[str] | subprocess.Popen[str]] = (
  179. subprocess.run if run else subprocess_p_open
  180. )
  181. return fn(non_empty_args, **kwargs)
  182. @contextlib.contextmanager
  183. def run_concurrently_context(
  184. *fns: Callable[..., Any] | tuple[Callable[..., Any], ...],
  185. ) -> Generator[list[futures.Future], None, None]:
  186. """Run functions concurrently in a thread pool.
  187. Args:
  188. *fns: The functions to run.
  189. Yields:
  190. The futures for the functions.
  191. """
  192. # If no functions are provided, yield an empty list and return.
  193. if not fns:
  194. yield []
  195. return
  196. # Convert the functions to tuples.
  197. fns = tuple(fn if isinstance(fn, tuple) else (fn,) for fn in fns)
  198. # Run the functions concurrently.
  199. executor = None
  200. try:
  201. executor = futures.ThreadPoolExecutor(max_workers=len(fns))
  202. # Submit the tasks.
  203. tasks = [executor.submit(*fn) for fn in fns]
  204. # Yield control back to the main thread while tasks are running.
  205. yield tasks
  206. # Get the results in the order completed to check any exceptions.
  207. for task in futures.as_completed(tasks):
  208. # if task throws something, we let it bubble up immediately
  209. task.result()
  210. finally:
  211. # Shutdown the executor
  212. if executor:
  213. executor.shutdown(wait=False)
  214. def run_concurrently(*fns: Callable | Tuple) -> None:
  215. """Run functions concurrently in a thread pool.
  216. Args:
  217. *fns: The functions to run.
  218. """
  219. with run_concurrently_context(*fns):
  220. pass
  221. def stream_logs(
  222. message: str,
  223. process: subprocess.Popen,
  224. suppress_errors: bool = False,
  225. analytics_enabled: bool = False,
  226. ):
  227. """Stream the logs for a process.
  228. Args:
  229. message: The message to display.
  230. process: The process.
  231. suppress_errors: If True, do not exit if errors are encountered (for fallback).
  232. analytics_enabled: Whether analytics are enabled for this command.
  233. Yields:
  234. The lines of the process output.
  235. Raises:
  236. Exit: If the process failed.
  237. """
  238. from reflex.utils import telemetry
  239. # Store the tail of the logs.
  240. logs = collections.deque(maxlen=512)
  241. with process:
  242. console.debug(message)
  243. if process.stdout is None:
  244. return
  245. for line in process.stdout:
  246. console.debug(line, end="")
  247. logs.append(line)
  248. yield line
  249. # Check if the process failed (not printing the logs for SIGINT).
  250. # Windows uvicorn bug
  251. # https://github.com/reflex-dev/reflex/issues/2335
  252. accepted_return_codes = [0, -2, 15] if constants.IS_WINDOWS else [0, -2]
  253. if process.returncode not in accepted_return_codes and not suppress_errors:
  254. console.error(f"{message} failed with exit code {process.returncode}")
  255. for line in logs:
  256. console.error(line, end="")
  257. if analytics_enabled:
  258. telemetry.send("error", context=message)
  259. console.error(
  260. "Run with "
  261. + colored("--loglevel debug", attrs=("bold",))
  262. + " for the full log."
  263. )
  264. raise typer.Exit(1)
  265. def show_logs(message: str, process: subprocess.Popen):
  266. """Show the logs for a process.
  267. Args:
  268. message: The message to display.
  269. process: The process.
  270. """
  271. for _ in stream_logs(message, process):
  272. pass
  273. def show_status(
  274. message: str,
  275. process: subprocess.Popen,
  276. suppress_errors: bool = False,
  277. analytics_enabled: bool = False,
  278. prior_processes: Tuple[subprocess.Popen, ...] = (),
  279. ):
  280. """Show the status of a process.
  281. Args:
  282. message: The initial message to display.
  283. process: The process.
  284. suppress_errors: If True, do not exit if errors are encountered (for fallback).
  285. analytics_enabled: Whether analytics are enabled for this command.
  286. prior_processes: The prior processes that have been run.
  287. """
  288. for one_process in (*prior_processes, process):
  289. with console.status(message) as status:
  290. for line in stream_logs(
  291. message,
  292. one_process,
  293. suppress_errors=suppress_errors,
  294. analytics_enabled=analytics_enabled,
  295. ):
  296. status.update(f"{message} {line}")
  297. def show_progress(message: str, process: subprocess.Popen, checkpoints: list[str]):
  298. """Show a progress bar for a process.
  299. Args:
  300. message: The message to display.
  301. process: The process.
  302. checkpoints: The checkpoints to advance the progress bar.
  303. """
  304. # Iterate over the process output.
  305. progress = ProgressBar(
  306. steps=len(checkpoints),
  307. components=(
  308. (MessageComponent(message), 0),
  309. (FunGuyProgressComponent(), 2),
  310. (CounterComponent(), 1),
  311. ),
  312. )
  313. for line in stream_logs(message, process):
  314. # Check for special strings and update the progress bar.
  315. for special_string in checkpoints:
  316. if special_string in line:
  317. progress.update(1)
  318. if special_string == checkpoints[-1]:
  319. progress.finish()
  320. break
  321. def atexit_handler():
  322. """Display a custom message with the current time when exiting an app."""
  323. console.log("Reflex app stopped.")
  324. def get_command_with_loglevel(command: list[str]) -> list[str]:
  325. """Add the right loglevel flag to the designated command.
  326. npm uses --loglevel <level>, Bun doesn't use the --loglevel flag and
  327. runs in debug mode by default.
  328. Args:
  329. command:The command to add loglevel flag.
  330. Returns:
  331. The updated command list
  332. """
  333. npm_path = path_ops.get_npm_path()
  334. npm_path = str(npm_path) if npm_path else None
  335. if command[0] == npm_path:
  336. return [*command, "--loglevel", "silly"]
  337. return command
  338. def run_process_with_fallbacks(
  339. args: list[str],
  340. *,
  341. show_status_message: str,
  342. fallbacks: str | Sequence[str] | Sequence[Sequence[str]] | None = None,
  343. analytics_enabled: bool = False,
  344. prior_processes: Tuple[subprocess.Popen, ...] = (),
  345. **kwargs,
  346. ):
  347. """Run subprocess and retry using fallback command if initial command fails.
  348. Args:
  349. args: A string, or a sequence of program arguments.
  350. show_status_message: The status message to be displayed in the console.
  351. fallbacks: The fallback command to run if the initial command fails.
  352. analytics_enabled: Whether analytics are enabled for this command.
  353. prior_processes: The prior processes that have been run.
  354. **kwargs: Kwargs to pass to new_process function.
  355. """
  356. process = new_process(get_command_with_loglevel(args), **kwargs)
  357. if not fallbacks:
  358. # No fallback given, or this _is_ the fallback command.
  359. show_status(
  360. show_status_message,
  361. process,
  362. analytics_enabled=analytics_enabled,
  363. prior_processes=prior_processes,
  364. )
  365. else:
  366. # Suppress errors for initial command, because we will try to fallback
  367. show_status(show_status_message, process, suppress_errors=True)
  368. current_fallback = fallbacks[0] if not isinstance(fallbacks, str) else fallbacks
  369. next_fallbacks = fallbacks[1:] if not isinstance(fallbacks, str) else None
  370. if process.returncode != 0:
  371. # retry with fallback command.
  372. fallback_with_args = (
  373. [current_fallback, *args[1:]]
  374. if isinstance(current_fallback, str)
  375. else [*current_fallback, *args[1:]]
  376. )
  377. console.warn(
  378. f"There was an error running command: {args}. Falling back to: {fallback_with_args}."
  379. )
  380. run_process_with_fallbacks(
  381. fallback_with_args,
  382. show_status_message=show_status_message,
  383. fallbacks=next_fallbacks,
  384. analytics_enabled=analytics_enabled,
  385. prior_processes=(*prior_processes, process),
  386. **kwargs,
  387. )
  388. def execute_command_and_return_output(command: str) -> str | None:
  389. """Execute a command and return the output.
  390. Args:
  391. command: The command to run.
  392. Returns:
  393. The output of the command.
  394. """
  395. try:
  396. return subprocess.check_output(command, shell=True).decode().strip()
  397. except subprocess.SubprocessError as err:
  398. console.error(
  399. f"The command `{command}` failed with error: {err}. This will return None."
  400. )
  401. return None