printer.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. """A module that provides a progress bar for the terminal."""
  2. import dataclasses
  3. import time
  4. from typing import Protocol, Sequence
  5. from reflex.utils.console import Reprinter, _get_terminal_width
  6. reprinter = Reprinter()
  7. class ProgressBarComponent(Protocol):
  8. """A protocol for progress bar components."""
  9. def minimum_width(self, current: int, steps: int) -> int:
  10. """Return the minimum width of the component."""
  11. ...
  12. def requested_width(self, current: int, steps: int) -> int:
  13. """Return the requested width of the component."""
  14. ...
  15. def initialize(self, steps: int) -> None:
  16. """Initialize the component."""
  17. ...
  18. def get_message(self, current: int, steps: int, max_width: int) -> str:
  19. """Return the message to display."""
  20. ...
  21. @dataclasses.dataclass
  22. class MessageComponent(ProgressBarComponent):
  23. """A simple component that displays a message."""
  24. message: str
  25. def minimum_width(self, current: int, steps: int) -> int:
  26. """Return the minimum width of the component."""
  27. return len(self.message)
  28. def requested_width(self, current: int, steps: int) -> int:
  29. """Return the requested width of the component."""
  30. return len(self.message)
  31. def initialize(self, steps: int) -> None:
  32. """Initialize the component."""
  33. def get_message(self, current: int, steps: int, max_width: int) -> str:
  34. """Return the message to display."""
  35. return self.message
  36. @dataclasses.dataclass
  37. class PercentageComponent(ProgressBarComponent):
  38. """A component that displays the percentage of completion."""
  39. def minimum_width(self, current: int, steps: int) -> int:
  40. """Return the minimum width of the component."""
  41. return 4
  42. def requested_width(self, current: int, steps: int) -> int:
  43. """Return the requested width of the component."""
  44. return 4
  45. def initialize(self, steps: int) -> None:
  46. """Initialize the component."""
  47. def get_message(self, current: int, steps: int, max_width: int) -> str:
  48. """Return the message to display."""
  49. return f"{int(current / steps * 100):3}%"
  50. @dataclasses.dataclass
  51. class TimeComponent(ProgressBarComponent):
  52. """A component that displays the time elapsed."""
  53. initial_time: float | None = None
  54. def minimum_width(self, current: int, steps: int) -> int:
  55. """Return the minimum width of the component."""
  56. if self.initial_time is None:
  57. raise ValueError("TimeComponent not initialized")
  58. return len(f"{time.time() - self.initial_time:.1f}s")
  59. def requested_width(self, current: int, steps: int) -> int:
  60. """Return the requested width of the component."""
  61. if self.initial_time is None:
  62. raise ValueError("TimeComponent not initialized")
  63. return len(f"{time.time() - self.initial_time:.1f}s")
  64. def initialize(self, steps: int) -> None:
  65. """Initialize the component."""
  66. self.initial_time = time.time()
  67. def get_message(self, current: int, steps: int, max_width: int) -> str:
  68. """Return the message to display."""
  69. if self.initial_time is None:
  70. raise ValueError("TimeComponent not initialized")
  71. return f"{time.time() - self.initial_time:.1f}s"
  72. @dataclasses.dataclass
  73. class CounterComponent(ProgressBarComponent):
  74. """A component that displays the current step and total steps."""
  75. def minimum_width(self, current: int, steps: int) -> int:
  76. """Return the minimum width of the component."""
  77. return 1 + 2 * len(str(steps))
  78. def requested_width(self, current: int, steps: int) -> int:
  79. """Return the requested width of the component."""
  80. return 1 + 2 * len(str(steps))
  81. def initialize(self, steps: int) -> None:
  82. """Initialize the component."""
  83. def get_message(self, current: int, steps: int, max_width: int) -> str:
  84. """Return the message to display."""
  85. return current.__format__(f"{len(str(steps))}") + "/" + str(steps)
  86. @dataclasses.dataclass
  87. class SimpleProgressComponent:
  88. """A component that displays a not so fun guy."""
  89. starting_str: str = ""
  90. ending_str: str = ""
  91. complete_str: str = "█"
  92. incomplete_str: str = "░"
  93. def minimum_width(self, current: int, steps: int) -> int:
  94. """Return the minimum width of the component."""
  95. return (
  96. len(self.starting_str)
  97. + 2 * len(self.incomplete_str)
  98. + 2 * len(self.complete_str)
  99. + len(self.ending_str)
  100. )
  101. def requested_width(self, current: int, steps: int) -> int:
  102. """Return the requested width of the component."""
  103. return (
  104. len(self.starting_str)
  105. + steps * max(len(self.incomplete_str), len(self.complete_str))
  106. + len(self.ending_str)
  107. )
  108. def initialize(self, steps: int) -> None:
  109. """Initialize the component."""
  110. def get_message(self, current: int, steps: int, max_width: int) -> str:
  111. """Return the message to display."""
  112. progress = int(
  113. current
  114. / steps
  115. * (max_width - len(self.starting_str) - len(self.ending_str))
  116. )
  117. complete_part = self.complete_str * (progress // len(self.complete_str))
  118. incomplete_part = self.incomplete_str * (
  119. (
  120. max_width
  121. - len(self.starting_str)
  122. - len(self.ending_str)
  123. - len(complete_part)
  124. )
  125. // len(self.incomplete_str)
  126. )
  127. return self.starting_str + complete_part + incomplete_part + self.ending_str
  128. @dataclasses.dataclass
  129. class FunGuyProgressComponent:
  130. """A component that displays a fun guy."""
  131. starting_str: str = ""
  132. ending_str: str = ""
  133. fun_guy_running: Sequence[str] = ("🯇", "🯈")
  134. fun_guy_finished: str = "🯆"
  135. incomplete_str: str = "·"
  136. complete_str: str = " "
  137. def minimum_width(self, current: int, steps: int) -> int:
  138. """Return the minimum width of the component."""
  139. return (
  140. len(self.starting_str)
  141. + len(self.incomplete_str)
  142. + 1
  143. + len(self.complete_str)
  144. + len(self.ending_str)
  145. )
  146. def requested_width(self, current: int, steps: int) -> int:
  147. """Return the requested width of the component."""
  148. return steps + len(self.starting_str) + len(self.ending_str)
  149. def initialize(self, steps: int) -> None:
  150. """Initialize the component."""
  151. def get_message(self, current: int, steps: int, max_width: int) -> str:
  152. """Return the message to display."""
  153. progress = int(
  154. current
  155. / steps
  156. * (max_width - len(self.starting_str) - len(self.ending_str))
  157. )
  158. fun_guy = (
  159. self.fun_guy_running[progress % len(self.fun_guy_running)]
  160. if current != steps
  161. else self.fun_guy_finished
  162. )
  163. before_guy = self.complete_str * max(0, progress - len(fun_guy))
  164. after_guy = self.incomplete_str * max(
  165. 0,
  166. max_width
  167. - len(before_guy)
  168. - len(fun_guy)
  169. - len(self.starting_str)
  170. - len(self.ending_str),
  171. )
  172. return self.starting_str + before_guy + fun_guy + after_guy + self.ending_str
  173. @dataclasses.dataclass
  174. class ProgressBar:
  175. """A progress bar that displays the progress of a task."""
  176. steps: int
  177. max_width: int = 80
  178. separator: str = " "
  179. components: Sequence[tuple[ProgressBarComponent, int]] = dataclasses.field(
  180. default_factory=lambda: [
  181. (FunGuyProgressComponent(), 2),
  182. (CounterComponent(), 3),
  183. (PercentageComponent(), 0),
  184. (TimeComponent(), 1),
  185. ]
  186. )
  187. _printer: Reprinter = dataclasses.field(default_factory=Reprinter, init=False)
  188. _current: int = dataclasses.field(default=0, init=False)
  189. def __post_init__(self):
  190. """Initialize the progress bar."""
  191. for component, _ in self.components:
  192. component.initialize(self.steps)
  193. def print(self):
  194. """Print the current progress bar state."""
  195. current_terminal_width = _get_terminal_width()
  196. components_by_priority = [
  197. (index, component)
  198. for index, (component, _) in sorted(
  199. enumerate(self.components), key=lambda x: x[1][1], reverse=True
  200. )
  201. ]
  202. possible_width = min(current_terminal_width, self.max_width)
  203. sum_of_minimum_widths = sum(
  204. component.minimum_width(self._current, self.steps)
  205. for _, component in components_by_priority
  206. )
  207. if sum_of_minimum_widths > possible_width:
  208. used_width = 0
  209. visible_components: list[tuple[int, ProgressBarComponent, int]] = []
  210. for index, component in components_by_priority:
  211. if (
  212. used_width
  213. + component.minimum_width(self._current, self.steps)
  214. + len(self.separator)
  215. > possible_width
  216. ):
  217. continue
  218. used_width += component.minimum_width(self._current, self.steps)
  219. visible_components.append(
  220. (
  221. index,
  222. component,
  223. component.requested_width(self._current, self.steps),
  224. )
  225. )
  226. else:
  227. components = [
  228. (
  229. priority,
  230. component,
  231. component.minimum_width(self._current, self.steps),
  232. )
  233. for (component, priority) in self.components
  234. ]
  235. while True:
  236. sum_of_assigned_width = sum(width for _, _, width in components)
  237. extra_width = (
  238. possible_width
  239. - sum_of_assigned_width
  240. - (len(self.separator) * (len(components) - 1))
  241. )
  242. possible_extra_width_to_take = [
  243. (
  244. max(
  245. 0,
  246. component.requested_width(self._current, self.steps)
  247. - width,
  248. ),
  249. priority,
  250. )
  251. for priority, component, width in components
  252. ]
  253. sum_of_possible_extra_width = sum(
  254. width for width, _ in possible_extra_width_to_take
  255. )
  256. if sum_of_possible_extra_width <= 0 or extra_width <= 0:
  257. break
  258. min_width, max_prioririty = min(
  259. filter(lambda x: x[0] > 0, possible_extra_width_to_take),
  260. key=lambda x: x[0] / x[1],
  261. )
  262. maximum_prioririty_repeats = min_width / max_prioririty
  263. give_width = [
  264. min(width, maximum_prioririty_repeats * priority)
  265. for width, priority in possible_extra_width_to_take
  266. ]
  267. sum_of_give_width = sum(give_width)
  268. normalized_give_width = [
  269. width / sum_of_give_width * min(extra_width, sum_of_give_width)
  270. for width in give_width
  271. ]
  272. components = [
  273. (index, component, int(width + give))
  274. for (index, component, width), give in zip(
  275. components, normalized_give_width, strict=True
  276. )
  277. ]
  278. if sum(width for _, _, width in components) == sum_of_minimum_widths:
  279. break
  280. visible_components = [
  281. (index, component, width)
  282. for index, (_, component, width) in enumerate(components)
  283. if width > 0
  284. ]
  285. messages = [
  286. self.get_message(component, width)
  287. for _, component, width in sorted(visible_components, key=lambda x: x[0])
  288. ]
  289. self._printer.reprint(self.separator.join(messages))
  290. def get_message(self, component: ProgressBarComponent, width: int):
  291. """Get the message for a given component."""
  292. message = component.get_message(self._current, self.steps, width)
  293. if len(message) > width:
  294. raise ValueError(
  295. f"Component message too long: {message} (length: {len(message)}, width: {width})"
  296. )
  297. return message
  298. def update(self, step: int):
  299. """Update the progress bar by a given step."""
  300. self._current += step
  301. self.print()
  302. def finish(self):
  303. """Finish the progress bar."""
  304. self._current = self.steps
  305. self.print()