processes.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. """Process operations."""
  2. from __future__ import annotations
  3. import collections
  4. import contextlib
  5. import os
  6. import signal
  7. import subprocess
  8. from concurrent import futures
  9. from typing import Callable, Generator, List, Optional, Tuple, Union
  10. import psutil
  11. import typer
  12. from reflex.utils import console, path_ops, prerequisites
  13. def kill(pid):
  14. """Kill a process.
  15. Args:
  16. pid: The process ID.
  17. """
  18. os.kill(pid, signal.SIGTERM)
  19. def get_num_workers() -> int:
  20. """Get the number of backend worker processes.
  21. Returns:
  22. The number of backend worker processes.
  23. """
  24. return 1 if prerequisites.get_redis() is None else (os.cpu_count() or 1) * 2 + 1
  25. def get_process_on_port(port) -> Optional[psutil.Process]:
  26. """Get the process on the given port.
  27. Args:
  28. port: The port.
  29. Returns:
  30. The process on the given port.
  31. """
  32. for proc in psutil.process_iter(["pid", "name", "cmdline"]):
  33. try:
  34. for conns in proc.connections(kind="inet"):
  35. if conns.laddr.port == int(port):
  36. return proc
  37. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  38. pass
  39. return None
  40. def is_process_on_port(port) -> bool:
  41. """Check if a process is running on the given port.
  42. Args:
  43. port: The port.
  44. Returns:
  45. Whether a process is running on the given port.
  46. """
  47. return get_process_on_port(port) is not None
  48. def kill_process_on_port(port):
  49. """Kill the process on the given port.
  50. Args:
  51. port: The port.
  52. """
  53. if get_process_on_port(port) is not None:
  54. with contextlib.suppress(psutil.AccessDenied):
  55. get_process_on_port(port).kill() # type: ignore
  56. def change_or_terminate_port(port, _type) -> str:
  57. """Terminate or change the port.
  58. Args:
  59. port: The port.
  60. _type: The type of the port.
  61. Returns:
  62. The new port or the current one.
  63. Raises:
  64. Exit: If the user wants to exit.
  65. """
  66. console.info(
  67. f"Something is already running on port [bold underline]{port}[/bold underline]. This is the port the {_type} runs on."
  68. )
  69. frontend_action = console.ask("Kill or change it?", choices=["k", "c", "n"])
  70. if frontend_action == "k":
  71. kill_process_on_port(port)
  72. return port
  73. elif frontend_action == "c":
  74. new_port = console.ask("Specify the new port")
  75. # Check if also the new port is used
  76. if is_process_on_port(new_port):
  77. return change_or_terminate_port(new_port, _type)
  78. else:
  79. console.info(
  80. f"The {_type} will run on port [bold underline]{new_port}[/bold underline]."
  81. )
  82. return new_port
  83. else:
  84. console.log("Exiting...")
  85. raise typer.Exit()
  86. def new_process(args, run: bool = False, show_logs: bool = False, **kwargs):
  87. """Wrapper over subprocess.Popen to unify the launch of child processes.
  88. Args:
  89. args: A string, or a sequence of program arguments.
  90. run: Whether to run the process to completion.
  91. show_logs: Whether to show the logs of the process.
  92. **kwargs: Kwargs to override default wrap values to pass to subprocess.Popen as arguments.
  93. Returns:
  94. Execute a child program in a new process.
  95. """
  96. node_bin_path = path_ops.get_node_bin_path()
  97. if not node_bin_path:
  98. console.warn(
  99. "The path to the Node binary could not be found. Please ensure that Node is properly "
  100. "installed and added to your system's PATH environment variable."
  101. )
  102. # Add the node bin path to the PATH environment variable.
  103. env = {
  104. **os.environ,
  105. "PATH": os.pathsep.join([node_bin_path if node_bin_path else "", os.environ["PATH"]]), # type: ignore
  106. **kwargs.pop("env", {}),
  107. }
  108. kwargs = {
  109. "env": env,
  110. "stderr": None if show_logs else subprocess.STDOUT,
  111. "stdout": None if show_logs else subprocess.PIPE,
  112. "universal_newlines": True,
  113. "encoding": "UTF-8",
  114. **kwargs,
  115. }
  116. console.debug(f"Running command: {args}")
  117. fn = subprocess.run if run else subprocess.Popen
  118. return fn(args, **kwargs)
  119. @contextlib.contextmanager
  120. def run_concurrently_context(
  121. *fns: Union[Callable, Tuple]
  122. ) -> Generator[list[futures.Future], None, None]:
  123. """Run functions concurrently in a thread pool.
  124. Args:
  125. *fns: The functions to run.
  126. Yields:
  127. The futures for the functions.
  128. """
  129. # If no functions are provided, yield an empty list and return.
  130. if not fns:
  131. yield []
  132. return
  133. # Convert the functions to tuples.
  134. fns = [fn if isinstance(fn, tuple) else (fn,) for fn in fns] # type: ignore
  135. # Run the functions concurrently.
  136. with futures.ThreadPoolExecutor(max_workers=len(fns)) as executor:
  137. # Submit the tasks.
  138. tasks = [executor.submit(*fn) for fn in fns] # type: ignore
  139. # Yield control back to the main thread while tasks are running.
  140. yield tasks
  141. # Get the results in the order completed to check any exceptions.
  142. for task in futures.as_completed(tasks):
  143. task.result()
  144. def run_concurrently(*fns: Union[Callable, Tuple]) -> None:
  145. """Run functions concurrently in a thread pool.
  146. Args:
  147. *fns: The functions to run.
  148. """
  149. with run_concurrently_context(*fns):
  150. pass
  151. def stream_logs(message: str, process: subprocess.Popen, progress=None):
  152. """Stream the logs for a process.
  153. Args:
  154. message: The message to display.
  155. process: The process.
  156. progress: The ongoing progress bar if one is being used.
  157. Yields:
  158. The lines of the process output.
  159. Raises:
  160. Exit: If the process failed.
  161. """
  162. # Store the tail of the logs.
  163. logs = collections.deque(maxlen=512)
  164. with process:
  165. console.debug(message, progress=progress)
  166. if process.stdout is None:
  167. return
  168. for line in process.stdout:
  169. console.debug(line, end="", progress=progress)
  170. logs.append(line)
  171. yield line
  172. # Check if the process failed (not printing the logs for SIGINT).
  173. if process.returncode not in [0, -2]:
  174. console.error(f"{message} failed with exit code {process.returncode}")
  175. for line in logs:
  176. console.error(line, end="")
  177. console.error("Run with [bold]--loglevel debug [/bold] for the full log.")
  178. raise typer.Exit(1)
  179. def show_logs(message: str, process: subprocess.Popen):
  180. """Show the logs for a process.
  181. Args:
  182. message: The message to display.
  183. process: The process.
  184. """
  185. for _ in stream_logs(message, process):
  186. pass
  187. def show_status(message: str, process: subprocess.Popen):
  188. """Show the status of a process.
  189. Args:
  190. message: The initial message to display.
  191. process: The process.
  192. """
  193. with console.status(message) as status:
  194. for line in stream_logs(message, process):
  195. status.update(f"{message} {line}")
  196. def show_progress(message: str, process: subprocess.Popen, checkpoints: List[str]):
  197. """Show a progress bar for a process.
  198. Args:
  199. message: The message to display.
  200. process: The process.
  201. checkpoints: The checkpoints to advance the progress bar.
  202. """
  203. # Iterate over the process output.
  204. with console.progress() as progress:
  205. task = progress.add_task(f"{message}: ", total=len(checkpoints))
  206. for line in stream_logs(message, process, progress=progress):
  207. # Check for special strings and update the progress bar.
  208. for special_string in checkpoints:
  209. if special_string in line:
  210. progress.update(task, advance=1)
  211. if special_string == checkpoints[-1]:
  212. progress.update(task, completed=len(checkpoints))
  213. break
  214. def atexit_handler():
  215. """Display a custom message with the current time when exiting an app."""
  216. console.log("Reflex app stopped.")