123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 |
- """A module that provides a progress bar for the terminal."""
- import dataclasses
- import time
- from typing import Callable, Sequence
- from reflex.utils.console import Reprinter, _get_terminal_width
- reprinter = Reprinter()
- @dataclasses.dataclass(kw_only=True)
- class ProgressBarComponent:
- """A protocol for progress bar components."""
- colorer: Callable[[str], str] = lambda x: x
- def minimum_width(self, current: int, steps: int) -> int:
- """Return the minimum width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- """
- ...
- def requested_width(self, current: int, steps: int) -> int:
- """Return the requested width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- """
- ...
- def initialize(self, steps: int) -> None:
- """Initialize the component.
- Args:
- steps: The total number of steps.
- """
- ...
- def get_message(self, current: int, steps: int, max_width: int) -> str:
- """Return the message to display.
- Args:
- current: The current step.
- steps: The total number of steps.
- max_width: The maximum width of the component.
- """
- ...
- @dataclasses.dataclass
- class MessageComponent(ProgressBarComponent):
- """A simple component that displays a message."""
- message: str = ""
- def minimum_width(self, current: int, steps: int) -> int:
- """Return the minimum width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The minimum width of the component.
- """
- return len(self.message)
- def requested_width(self, current: int, steps: int) -> int:
- """Return the requested width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The requested width of the component.
- """
- return len(self.message)
- def initialize(self, steps: int) -> None:
- """Initialize the component.
- Args:
- steps: The total number of steps.
- """
- def get_message(self, current: int, steps: int, max_width: int) -> str:
- """Return the message to display.
- Args:
- current: The current step.
- steps: The total number of steps.
- max_width: The maximum width of the component.
- Returns:
- The message to display.
- """
- return self.message
- @dataclasses.dataclass
- class PercentageComponent(ProgressBarComponent):
- """A component that displays the percentage of completion."""
- def minimum_width(self, current: int, steps: int) -> int:
- """Return the minimum width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The minimum width of the component.
- """
- return 4
- def requested_width(self, current: int, steps: int) -> int:
- """Return the requested width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The requested width of the component.
- """
- return 4
- def initialize(self, steps: int) -> None:
- """Initialize the component.
- Args:
- steps: The total number of steps.
- """
- def get_message(self, current: int, steps: int, max_width: int) -> str:
- """Return the message to display.
- Args:
- current: The current step.
- steps: The total number of steps.
- max_width: The maximum width of the component.
- Returns:
- The message to display.
- """
- return f"{int(current / steps * 100):3}%"
- @dataclasses.dataclass
- class TimeComponent(ProgressBarComponent):
- """A component that displays the time elapsed."""
- initial_time: float | None = None
- _cached_time: float | None = dataclasses.field(default=None, init=False)
- def _minimum_and_requested_string(
- self, current: int, steps: int
- ) -> tuple[str, str]:
- """Return the minimum and requested string length of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The minimum and requested string length of the component.
- Raises:
- ValueError: If the component is not initialized.
- """
- if self.initial_time is None or self._cached_time is None:
- raise ValueError("TimeComponent not initialized")
- return (
- f"{int(self._cached_time - self.initial_time)!s}s",
- f"{int((self._cached_time - self.initial_time) * 1000)!s}ms",
- )
- def minimum_width(self, current: int, steps: int) -> int:
- """Return the minimum width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The minimum width of the component.
- Raises:
- ValueError: If the component is not initialized.
- """
- if self.initial_time is None:
- raise ValueError("TimeComponent not initialized")
- self._cached_time = time.monotonic()
- _min, _ = self._minimum_and_requested_string(current, steps)
- return len(_min)
- def requested_width(self, current: int, steps: int) -> int:
- """Return the requested width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The requested width of the component.
- Raises:
- ValueError: If the component is not initialized.
- """
- if self.initial_time is None:
- raise ValueError("TimeComponent not initialized")
- _, _req = self._minimum_and_requested_string(current, steps)
- return len(_req)
- def initialize(self, steps: int) -> None:
- """Initialize the component.
- Args:
- steps: The total number of steps.
- """
- self.initial_time = time.monotonic()
- def get_message(self, current: int, steps: int, max_width: int) -> str:
- """Return the message to display.
- Args:
- current: The current step.
- steps: The total number of steps.
- max_width: The maximum width of the component.
- Returns:
- The message to display.
- Raises:
- ValueError: If the component is not initialized.
- """
- if self.initial_time is None:
- raise ValueError("TimeComponent not initialized")
- _min, _req = self._minimum_and_requested_string(current, steps)
- if len(_req) <= max_width:
- return _req
- return _min
- @dataclasses.dataclass
- class CounterComponent(ProgressBarComponent):
- """A component that displays the current step and total steps."""
- def minimum_width(self, current: int, steps: int) -> int:
- """Return the minimum width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The minimum width of the component.
- """
- return 1 + 2 * len(str(steps))
- def requested_width(self, current: int, steps: int) -> int:
- """Return the requested width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The requested width of the component.
- """
- return 1 + 2 * len(str(steps))
- def initialize(self, steps: int) -> None:
- """Initialize the component.
- Args:
- steps: The total number of steps.
- """
- def get_message(self, current: int, steps: int, max_width: int) -> str:
- """Return the message to display.
- Args:
- current: The current step.
- steps: The total number of steps.
- max_width: The maximum width of the component.
- Returns:
- The message to display.
- """
- return current.__format__(f"{len(str(steps))}") + "/" + str(steps)
- @dataclasses.dataclass
- class SimpleProgressComponent(ProgressBarComponent):
- """A component that displays a not so fun guy."""
- starting_str: str = ""
- ending_str: str = ""
- complete_str: str = "█"
- incomplete_str: str = "░"
- def minimum_width(self, current: int, steps: int) -> int:
- """Return the minimum width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The minimum width of the component.
- """
- return (
- len(self.starting_str)
- + 2 * len(self.incomplete_str)
- + 2 * len(self.complete_str)
- + len(self.ending_str)
- )
- def requested_width(self, current: int, steps: int) -> int:
- """Return the requested width of the component.
- Args:
- current: The current step.
- steps: The total number of steps.
- Returns:
- The requested width of the component.
- """
- return (
- len(self.starting_str)
- + steps * max(len(self.incomplete_str), len(self.complete_str))
- + len(self.ending_str)
- )
- def initialize(self, steps: int) -> None:
- """Initialize the component.
- Args:
- steps: The total number of steps.
- """
- def get_message(self, current: int, steps: int, max_width: int) -> str:
- """Return the message to display.
- Args:
- current: The current step.
- steps: The total number of steps.
- max_width: The maximum width of the component.
- Returns:
- The message to display.
- """
- progress = int(
- current
- / steps
- * (max_width - len(self.starting_str) - len(self.ending_str))
- )
- complete_part = self.complete_str * (progress // len(self.complete_str))
- incomplete_part = self.incomplete_str * (
- (
- max_width
- - len(self.starting_str)
- - len(self.ending_str)
- - len(complete_part)
- )
- // len(self.incomplete_str)
- )
- return self.starting_str + complete_part + incomplete_part + self.ending_str
- @dataclasses.dataclass
- class ProgressBar:
- """A progress bar that displays the progress of a task."""
- steps: int
- max_width: int = 80
- separator: str = " "
- components: Sequence[tuple[ProgressBarComponent, int]] = dataclasses.field(
- default_factory=lambda: [
- (SimpleProgressComponent(), 2),
- (CounterComponent(), 3),
- (PercentageComponent(), 0),
- (TimeComponent(), 1),
- ]
- )
- _printer: Reprinter = dataclasses.field(default_factory=Reprinter, init=False)
- _current: int = dataclasses.field(default=0, init=False)
- def __post_init__(self):
- """Initialize the progress bar."""
- for component, _ in self.components:
- component.initialize(self.steps)
- def print(self):
- """Print the current progress bar state."""
- current_terminal_width = _get_terminal_width()
- components_by_priority = [
- (index, component)
- for index, (component, _) in sorted(
- enumerate(self.components), key=lambda x: x[1][1], reverse=True
- )
- ]
- possible_width = min(current_terminal_width, self.max_width)
- sum_of_minimum_widths = sum(
- component.minimum_width(self._current, self.steps)
- for _, component in components_by_priority
- )
- if sum_of_minimum_widths > possible_width:
- used_width = 0
- visible_components: list[tuple[int, ProgressBarComponent, int]] = []
- for index, component in components_by_priority:
- if (
- used_width
- + component.minimum_width(self._current, self.steps)
- + len(self.separator)
- > possible_width
- ):
- continue
- used_width += component.minimum_width(self._current, self.steps)
- visible_components.append(
- (
- index,
- component,
- component.requested_width(self._current, self.steps),
- )
- )
- else:
- components = [
- (
- priority,
- component,
- component.minimum_width(self._current, self.steps),
- )
- for (component, priority) in self.components
- ]
- while True:
- sum_of_assigned_width = sum(width for _, _, width in components)
- extra_width = (
- possible_width
- - sum_of_assigned_width
- - (len(self.separator) * (len(components) - 1))
- )
- possible_extra_width_to_take = [
- (
- max(
- 0,
- component.requested_width(self._current, self.steps)
- - width,
- ),
- priority,
- )
- for priority, component, width in components
- ]
- sum_of_possible_extra_width = sum(
- width for width, _ in possible_extra_width_to_take
- )
- if sum_of_possible_extra_width <= 0 or extra_width <= 0:
- break
- min_width, max_prioririty = min(
- filter(lambda x: x[0] > 0, possible_extra_width_to_take),
- key=lambda x: x[0] / x[1],
- )
- maximum_prioririty_repeats = min_width / max_prioririty
- give_width = [
- min(width, maximum_prioririty_repeats * priority)
- for width, priority in possible_extra_width_to_take
- ]
- sum_of_give_width = sum(give_width)
- normalized_give_width = [
- width / sum_of_give_width * min(extra_width, sum_of_give_width)
- for width in give_width
- ]
- components = [
- (index, component, int(width + give))
- for (index, component, width), give in zip(
- components, normalized_give_width, strict=True
- )
- ]
- if sum(width for _, _, width in components) == sum_of_minimum_widths:
- break
- visible_components = [
- (index, component, width)
- for index, (_, component, width) in enumerate(components)
- if width > 0
- ]
- messages = [
- self.get_message(component, width)
- for _, component, width in sorted(visible_components, key=lambda x: x[0])
- ]
- self._printer.reprint(self.separator.join(messages))
- def get_message(self, component: ProgressBarComponent, width: int):
- """Get the message for a given component.
- Args:
- component: The component to get the message for.
- width: The width of the component.
- Returns:
- The message for the component
- """
- message = component.get_message(self._current, self.steps, width)
- return component.colorer(message[:width])
- def update(self, step: int):
- """Update the progress bar by a given step.
- Args:
- step: The step to update the progress bar by.
- """
- self._current += step
- self.print()
- def finish(self):
- """Finish the progress bar."""
- self._current = self.steps
- self.print()
- self._printer.finish()
|