console.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. """Functions to communicate to the user via console."""
  2. from __future__ import annotations
  3. import contextlib
  4. import dataclasses
  5. import inspect
  6. import os
  7. import re
  8. import shutil
  9. import sys
  10. import time
  11. import types
  12. from dataclasses import dataclass
  13. from pathlib import Path
  14. from types import FrameType
  15. from reflex.constants import LogLevel
  16. from reflex.utils.terminal import colored
  17. def _get_terminal_width() -> int:
  18. try:
  19. # First try using shutil, which is more reliable across platforms
  20. return shutil.get_terminal_size().columns
  21. except (AttributeError, ValueError, OSError):
  22. try:
  23. # Fallback to environment variables
  24. return int(os.environ.get("COLUMNS", os.environ.get("TERM_WIDTH", 80)))
  25. except (TypeError, ValueError):
  26. # Default fallback
  27. return 80
  28. IS_REPRENTER_ACTIVE = False
  29. @dataclasses.dataclass
  30. class Reprinter:
  31. """A class that reprints text on the terminal."""
  32. _text: str = dataclasses.field(default="", init=False)
  33. @staticmethod
  34. def _moveup(lines: int):
  35. for _ in range(lines):
  36. sys.stdout.write("\x1b[A")
  37. @staticmethod
  38. def _movestart():
  39. sys.stdout.write("\r")
  40. def reprint(self, text: str):
  41. """Reprint the text.
  42. Args:
  43. text: The text to print
  44. """
  45. global IS_REPRENTER_ACTIVE
  46. IS_REPRENTER_ACTIVE = True
  47. text = text.rstrip("\n")
  48. number_of_lines = self._text.count("\n") + 1
  49. number_of_lines_new = text.count("\n") + 1
  50. # Clear previous text by overwritig non-spaces with spaces
  51. self._moveup(number_of_lines - 1)
  52. self._movestart()
  53. sys.stdout.write(re.sub(r"[^\s]", " ", self._text))
  54. # Print new text
  55. lines = min(number_of_lines, number_of_lines_new)
  56. self._moveup(lines - 1)
  57. self._movestart()
  58. sys.stdout.write(text)
  59. sys.stdout.flush()
  60. self._text = text
  61. def finish(self):
  62. """Finish printing the text."""
  63. sys.stdout.write("\n")
  64. sys.stdout.flush()
  65. global IS_REPRENTER_ACTIVE
  66. IS_REPRENTER_ACTIVE = False
  67. @dataclass
  68. class Status:
  69. """A status class for displaying a spinner."""
  70. message: str = "Loading"
  71. _reprinter: Reprinter | None = dataclasses.field(default=None, init=False)
  72. _parity: int = dataclasses.field(default=0, init=False)
  73. def __enter__(self):
  74. """Enter the context manager.
  75. Returns:
  76. The status object.
  77. """
  78. self._reprinter = Reprinter()
  79. return self
  80. def __exit__(
  81. self,
  82. exc_type: type[BaseException] | None,
  83. exc_value: BaseException | None,
  84. traceback: types.TracebackType | None,
  85. ):
  86. """Exit the context manager.
  87. Args:
  88. exc_type: The exception type.
  89. exc_value: The exception value.
  90. traceback: The traceback.
  91. """
  92. if self._reprinter:
  93. self._reprinter.reprint("")
  94. self._reprinter.finish()
  95. self._reprinter._moveup(1)
  96. sys.stdout.flush()
  97. self._reprinter = None
  98. def update(self, msg: str, **kwargs):
  99. """Update the status spinner.
  100. Args:
  101. msg: The message to display.
  102. kwargs: Keyword arguments to pass to the print function.
  103. """
  104. if self._reprinter:
  105. char = (
  106. "◐"
  107. if self._parity % 4 == 0
  108. else (
  109. "◓"
  110. if self._parity % 4 == 1
  111. else ("◑" if self._parity % 4 == 2 else "◒")
  112. )
  113. )
  114. self._parity += 1
  115. self._reprinter.reprint(f"{char} {msg}")
  116. @dataclass
  117. class Console:
  118. """A console class for pretty printing."""
  119. def print(self, msg: str, **kwargs):
  120. """Print a message.
  121. Args:
  122. msg: The message to print.
  123. kwargs: Keyword arguments to pass to the print function.
  124. """
  125. from builtins import print
  126. color = kwargs.pop("color", None)
  127. bold = kwargs.pop("bold", False)
  128. if color or bold:
  129. msg = colored(msg, color, attrs=["bold"] if bold else [])
  130. if IS_REPRENTER_ACTIVE:
  131. print("\n" + msg, flush=True, **kwargs) # noqa: T201
  132. else:
  133. print(msg, **kwargs) # noqa: T201
  134. def rule(self, title: str, **kwargs):
  135. """Prints a horizontal rule with a title.
  136. Args:
  137. title: The title of the rule.
  138. kwargs: Keyword arguments to pass to the print function.
  139. """
  140. terminal_width = _get_terminal_width()
  141. remaining_width = (
  142. terminal_width - len(title) - 2
  143. ) # 2 for the spaces around the title
  144. left_padding = remaining_width // 2
  145. right_padding = remaining_width - left_padding
  146. color = kwargs.pop("color", None)
  147. bold = kwargs.pop("bold", True)
  148. rule_color = "green" if color is None else color
  149. title = colored(title, color, attrs=("bold",) if bold else ())
  150. rule_line = (
  151. colored("─" * left_padding, rule_color)
  152. + " "
  153. + title
  154. + " "
  155. + colored("─" * right_padding, rule_color)
  156. )
  157. self.print(rule_line, **kwargs)
  158. def status(self, *args, **kwargs):
  159. """Create a status.
  160. Args:
  161. *args: Args to pass to the status.
  162. **kwargs: Kwargs to pass to the status.
  163. Returns:
  164. A new status.
  165. """
  166. return Status(*args, **kwargs)
  167. class Prompt:
  168. """A class for prompting the user for input."""
  169. @staticmethod
  170. def ask(
  171. question: str,
  172. choices: list[str] | None = None,
  173. default: str | None = None,
  174. show_choices: bool = True,
  175. ) -> str | None:
  176. """Ask the user a question.
  177. Args:
  178. question: The question to ask the user.
  179. choices: A list of choices to select from.
  180. default: The default option selected.
  181. show_choices: Whether to show the choices.
  182. Returns:
  183. The user's response or the default value.
  184. """
  185. prompt = question
  186. if choices and show_choices:
  187. choice_str = "/".join(choices)
  188. prompt = f"{question} [{choice_str}]"
  189. if default is not None:
  190. prompt = f"{prompt} ({default})"
  191. prompt = f"{prompt}: "
  192. response = input(prompt)
  193. if not response and default is not None:
  194. return default
  195. if choices and response not in choices:
  196. print(f"Please choose from: {', '.join(choices)}")
  197. return Prompt.ask(question, choices, default, show_choices)
  198. return response
  199. # Console for pretty printing.
  200. _console = Console()
  201. # The current log level.
  202. _LOG_LEVEL = LogLevel.INFO
  203. # Deprecated features who's warning has been printed.
  204. _EMITTED_DEPRECATION_WARNINGS = set()
  205. # Info messages which have been printed.
  206. _EMITTED_INFO = set()
  207. # Warnings which have been printed.
  208. _EMIITED_WARNINGS = set()
  209. # Errors which have been printed.
  210. _EMITTED_ERRORS = set()
  211. # Success messages which have been printed.
  212. _EMITTED_SUCCESS = set()
  213. # Debug messages which have been printed.
  214. _EMITTED_DEBUG = set()
  215. # Logs which have been printed.
  216. _EMITTED_LOGS = set()
  217. # Prints which have been printed.
  218. _EMITTED_PRINTS = set()
  219. def set_log_level(log_level: LogLevel):
  220. """Set the log level.
  221. Args:
  222. log_level: The log level to set.
  223. Raises:
  224. TypeError: If the log level is a string.
  225. """
  226. if not isinstance(log_level, LogLevel):
  227. raise TypeError(
  228. f"log_level must be a LogLevel enum value, got {log_level} of type {type(log_level)} instead."
  229. )
  230. global _LOG_LEVEL
  231. if log_level != _LOG_LEVEL:
  232. # Set the loglevel persistenly for subprocesses.
  233. os.environ["LOGLEVEL"] = log_level.value
  234. _LOG_LEVEL = log_level
  235. def is_debug() -> bool:
  236. """Check if the log level is debug.
  237. Returns:
  238. True if the log level is debug.
  239. """
  240. return _LOG_LEVEL <= LogLevel.DEBUG
  241. def print(msg: str, dedupe: bool = False, **kwargs):
  242. """Print a message.
  243. Args:
  244. msg: The message to print.
  245. dedupe: If True, suppress multiple console logs of print message.
  246. kwargs: Keyword arguments to pass to the print function.
  247. """
  248. if dedupe:
  249. if msg in _EMITTED_PRINTS:
  250. return
  251. else:
  252. _EMITTED_PRINTS.add(msg)
  253. _console.print(msg, **kwargs)
  254. def debug(msg: str, dedupe: bool = False, **kwargs):
  255. """Print a debug message.
  256. Args:
  257. msg: The debug message.
  258. dedupe: If True, suppress multiple console logs of debug message.
  259. kwargs: Keyword arguments to pass to the print function.
  260. """
  261. if is_debug():
  262. if dedupe:
  263. if msg in _EMITTED_DEBUG:
  264. return
  265. else:
  266. _EMITTED_DEBUG.add(msg)
  267. kwargs.setdefault("color", "debug")
  268. print(msg, **kwargs)
  269. def info(msg: str, dedupe: bool = False, **kwargs):
  270. """Print an info message.
  271. Args:
  272. msg: The info message.
  273. dedupe: If True, suppress multiple console logs of info message.
  274. kwargs: Keyword arguments to pass to the print function.
  275. """
  276. if _LOG_LEVEL <= LogLevel.INFO:
  277. if dedupe:
  278. if msg in _EMITTED_INFO:
  279. return
  280. else:
  281. _EMITTED_INFO.add(msg)
  282. kwargs.setdefault("color", "info")
  283. print(f"Info: {msg}", **kwargs)
  284. def success(msg: str, dedupe: bool = False, **kwargs):
  285. """Print a success message.
  286. Args:
  287. msg: The success message.
  288. dedupe: If True, suppress multiple console logs of success message.
  289. kwargs: Keyword arguments to pass to the print function.
  290. """
  291. if _LOG_LEVEL <= LogLevel.INFO:
  292. if dedupe:
  293. if msg in _EMITTED_SUCCESS:
  294. return
  295. else:
  296. _EMITTED_SUCCESS.add(msg)
  297. kwargs.setdefault("color", "success")
  298. print(f"Success: {msg}", **kwargs)
  299. def log(msg: str, dedupe: bool = False, **kwargs):
  300. """Takes a string and logs it to the console.
  301. Args:
  302. msg: The message to log.
  303. dedupe: If True, suppress multiple console logs of log message.
  304. kwargs: Keyword arguments to pass to the print function.
  305. """
  306. if _LOG_LEVEL <= LogLevel.INFO:
  307. if dedupe:
  308. if msg in _EMITTED_LOGS:
  309. return
  310. else:
  311. _EMITTED_LOGS.add(msg)
  312. _console.print(msg, **kwargs)
  313. def rule(title: str, **kwargs):
  314. """Prints a horizontal rule with a title.
  315. Args:
  316. title: The title of the rule.
  317. kwargs: Keyword arguments to pass to the print function.
  318. """
  319. _console.rule(title, **kwargs)
  320. def warn(msg: str, dedupe: bool = False, **kwargs):
  321. """Print a warning message.
  322. Args:
  323. msg: The warning message.
  324. dedupe: If True, suppress multiple console logs of warning message.
  325. kwargs: Keyword arguments to pass to the print function.
  326. """
  327. if _LOG_LEVEL <= LogLevel.WARNING:
  328. if dedupe:
  329. if msg in _EMIITED_WARNINGS:
  330. return
  331. else:
  332. _EMIITED_WARNINGS.add(msg)
  333. kwargs.setdefault("color", "warning")
  334. print(f"Warning: {msg}", **kwargs)
  335. def _get_first_non_framework_frame() -> FrameType | None:
  336. import click
  337. import typer
  338. import typing_extensions
  339. import reflex as rx
  340. # Exclude utility modules that should never be the source of deprecated reflex usage.
  341. exclude_modules = [click, rx, typer, typing_extensions]
  342. exclude_roots = [
  343. p.parent.resolve()
  344. if (p := Path(m.__file__)).name == "__init__.py" # pyright: ignore [reportArgumentType]
  345. else p.resolve()
  346. for m in exclude_modules
  347. ]
  348. # Specifically exclude the reflex cli module.
  349. if reflex_bin := shutil.which(b"reflex"):
  350. exclude_roots.append(Path(reflex_bin.decode()))
  351. frame = inspect.currentframe()
  352. while frame := frame and frame.f_back:
  353. frame_path = Path(inspect.getfile(frame)).resolve()
  354. if not any(frame_path.is_relative_to(root) for root in exclude_roots):
  355. break
  356. return frame
  357. def deprecate(
  358. feature_name: str,
  359. reason: str,
  360. deprecation_version: str,
  361. removal_version: str,
  362. dedupe: bool = True,
  363. **kwargs,
  364. ):
  365. """Print a deprecation warning.
  366. Args:
  367. feature_name: The feature to deprecate.
  368. reason: The reason for deprecation.
  369. deprecation_version: The version the feature was deprecated
  370. removal_version: The version the deprecated feature will be removed
  371. dedupe: If True, suppress multiple console logs of deprecation message.
  372. kwargs: Keyword arguments to pass to the print function.
  373. """
  374. dedupe_key = feature_name
  375. loc = ""
  376. # See if we can find where the deprecation exists in "user code"
  377. origin_frame = _get_first_non_framework_frame()
  378. if origin_frame is not None:
  379. filename = Path(origin_frame.f_code.co_filename)
  380. if filename.is_relative_to(Path.cwd()):
  381. filename = filename.relative_to(Path.cwd())
  382. loc = f"{filename}:{origin_frame.f_lineno}"
  383. dedupe_key = f"{dedupe_key} {loc}"
  384. if dedupe_key not in _EMITTED_DEPRECATION_WARNINGS:
  385. msg = (
  386. f"{feature_name} has been deprecated in version {deprecation_version}. {reason.rstrip('.').lstrip('. ')}. It will be completely "
  387. f"removed in {removal_version}. ({loc})"
  388. )
  389. if _LOG_LEVEL <= LogLevel.WARNING:
  390. kwargs.setdefault("color", "warning")
  391. print(f"DeprecationWarning: {msg}", **kwargs)
  392. if dedupe:
  393. _EMITTED_DEPRECATION_WARNINGS.add(dedupe_key)
  394. def error(msg: str, dedupe: bool = False, **kwargs):
  395. """Print an error message.
  396. Args:
  397. msg: The error message.
  398. dedupe: If True, suppress multiple console logs of error message.
  399. kwargs: Keyword arguments to pass to the print function.
  400. """
  401. if _LOG_LEVEL <= LogLevel.ERROR:
  402. if dedupe:
  403. if msg in _EMITTED_ERRORS:
  404. return
  405. else:
  406. _EMITTED_ERRORS.add(msg)
  407. kwargs.setdefault("color", "error")
  408. print(f"{msg}", **kwargs)
  409. def ask(
  410. question: str,
  411. choices: list[str] | None = None,
  412. default: str | None = None,
  413. show_choices: bool = True,
  414. ) -> str | None:
  415. """Takes a prompt question and optionally a list of choices
  416. and returns the user input.
  417. Args:
  418. question: The question to ask the user.
  419. choices: A list of choices to select from.
  420. default: The default option selected.
  421. show_choices: Whether to show the choices.
  422. Returns:
  423. A string with the user input.
  424. """
  425. return Prompt.ask(
  426. question, choices=choices, default=default, show_choices=show_choices
  427. )
  428. def status(*args, **kwargs):
  429. """Create a status with a spinner.
  430. Args:
  431. *args: Args to pass to the status.
  432. **kwargs: Kwargs to pass to the status.
  433. Returns:
  434. A new status.
  435. """
  436. return _console.status(*args, **kwargs)
  437. @contextlib.contextmanager
  438. def timing(msg: str):
  439. """Create a context manager to time a block of code.
  440. Args:
  441. msg: The message to display.
  442. Yields:
  443. None.
  444. """
  445. start = time.monotonic()
  446. try:
  447. yield
  448. finally:
  449. debug(f"[timing] {msg}: {time.monotonic() - start:.2f}s", color="white")