orchestrator.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from multiprocessing import Lock
  12. from typing import Optional
  13. from taipy.config import Config
  14. from taipy.logger._taipy_logger import _TaipyLogger
  15. from ._cli._core_cli_factory import _CoreCLIFactory
  16. from ._orchestrator._dispatcher._job_dispatcher import _JobDispatcher
  17. from ._orchestrator._orchestrator import _Orchestrator
  18. from ._orchestrator._orchestrator_factory import _OrchestratorFactory
  19. from ._version._version_manager_factory import _VersionManagerFactory
  20. from .config import CoreSection
  21. from .exceptions.exceptions import OrchestratorServiceIsAlreadyRunning
  22. class Orchestrator:
  23. """
  24. Orchestrator service
  25. """
  26. _is_running = False
  27. __lock_is_running = Lock()
  28. _version_is_initialized = False
  29. __lock_version_is_initialized = Lock()
  30. __logger = _TaipyLogger._get_logger()
  31. _orchestrator: Optional[_Orchestrator] = None
  32. _dispatcher: Optional[_JobDispatcher] = None
  33. def __init__(self) -> None:
  34. """
  35. Initialize an Orchestrator service.
  36. """
  37. pass
  38. def run(self, force_restart=False):
  39. """
  40. Start an Orchestrator service.
  41. This function checks and locks the configuration, manages application's version,
  42. and starts a job dispatcher.
  43. """
  44. if self.__class__._is_running:
  45. raise OrchestratorServiceIsAlreadyRunning
  46. with self.__class__.__lock_is_running:
  47. self.__class__._is_running = True
  48. self._manage_version_and_block_config()
  49. self.__start_dispatcher(force_restart)
  50. self.__logger.info("Orchestrator service has been started.")
  51. def stop(self, wait: bool = True, timeout: Optional[float] = None):
  52. """
  53. Stop the Orchestrator service.
  54. This function stops the dispatcher and unblock the Config for update.
  55. Parameters:
  56. wait (bool): If True, the method will wait for the dispatcher to stop.
  57. timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely.
  58. """
  59. self.__logger.info("Unblocking configuration update...")
  60. Config.unblock_update()
  61. self.__logger.info("Stopping job dispatcher...")
  62. if self._dispatcher:
  63. self._dispatcher = _OrchestratorFactory._remove_dispatcher(wait, timeout)
  64. with self.__class__.__lock_is_running:
  65. self.__class__._is_running = False
  66. with self.__class__.__lock_version_is_initialized:
  67. self.__class__._version_is_initialized = False
  68. self.__logger.info("Orchestrator service has been stopped.")
  69. @classmethod
  70. def _manage_version_and_block_config(cls):
  71. """
  72. Manage the application's version and block the Config from updates.
  73. """
  74. if cls._version_is_initialized:
  75. return
  76. with cls.__lock_version_is_initialized:
  77. cls._version_is_initialized = True
  78. cls.__update_orchestrator_section()
  79. cls.__manage_version()
  80. cls.__check_and_block_config()
  81. @classmethod
  82. def __update_orchestrator_section(cls):
  83. cls.__logger.info("Updating configuration with command-line arguments...")
  84. _core_cli = _CoreCLIFactory._build_cli()
  85. _core_cli.create_parser()
  86. Config._applied_config._unique_sections[CoreSection.name]._update(_core_cli.handle_command())
  87. @classmethod
  88. def __manage_version(cls):
  89. cls.__logger.info("Managing application's version...")
  90. _VersionManagerFactory._build_manager()._manage_version()
  91. Config._applied_config._unique_sections[CoreSection.name]._update(
  92. {"version_number": _VersionManagerFactory._build_manager()._get_latest_version()}
  93. )
  94. @classmethod
  95. def __check_and_block_config(cls):
  96. cls.__logger.info("Checking application's version...")
  97. Config.check()
  98. cls.__logger.info("Blocking configuration update...")
  99. Config.block_update()
  100. def __start_dispatcher(self, force_restart):
  101. self.__logger.info("Starting job dispatcher...")
  102. if self._orchestrator is None:
  103. self._orchestrator = _OrchestratorFactory._build_orchestrator()
  104. if dispatcher := _OrchestratorFactory._build_dispatcher(force_restart=force_restart):
  105. self._dispatcher = dispatcher
  106. if Config.job_config.is_development:
  107. _Orchestrator._check_and_execute_jobs_if_development_mode()