test_orchestrator.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import pytest
  12. from taipy.common.config import Config
  13. from taipy.common.config.exceptions.exceptions import ConfigurationUpdateBlocked
  14. from taipy.core import Orchestrator
  15. from taipy.core._orchestrator._dispatcher import _DevelopmentJobDispatcher, _StandaloneJobDispatcher
  16. from taipy.core._orchestrator._orchestrator import _Orchestrator
  17. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  18. from taipy.core.config.job_config import JobConfig
  19. from taipy.core.exceptions.exceptions import OrchestratorServiceIsAlreadyRunning
  20. class TestOrchestrator:
  21. def test_run_orchestrator_trigger_config_check(self, caplog):
  22. Config.configure_data_node(id="d0", storage_type="toto")
  23. with pytest.raises(SystemExit):
  24. orchestrator = Orchestrator()
  25. orchestrator.run()
  26. expected_error_message = (
  27. "`storage_type` field of DataNodeConfig `d0` must be either csv, sql_table,"
  28. " sql, mongo_collection, pickle, excel, generic, json, parquet, s3_object, or in_memory."
  29. ' Current value of property `storage_type` is "toto".'
  30. )
  31. assert expected_error_message in caplog.text
  32. orchestrator.stop()
  33. def test_run_orchestrator_as_a_service_development_mode(self):
  34. _OrchestratorFactory._dispatcher = None
  35. orchestrator = Orchestrator()
  36. assert orchestrator._orchestrator is None
  37. assert orchestrator._dispatcher is None
  38. assert _OrchestratorFactory._dispatcher is None
  39. orchestrator.run()
  40. assert orchestrator._orchestrator is not None
  41. assert orchestrator._orchestrator == _Orchestrator
  42. assert _OrchestratorFactory._orchestrator is not None
  43. assert _OrchestratorFactory._orchestrator == _Orchestrator
  44. assert orchestrator._dispatcher is not None
  45. assert isinstance(orchestrator._dispatcher, _DevelopmentJobDispatcher)
  46. assert isinstance(_OrchestratorFactory._dispatcher, _DevelopmentJobDispatcher)
  47. orchestrator.stop()
  48. def test_run_orchestrator_as_a_service_standalone_mode(self):
  49. _OrchestratorFactory._dispatcher = None
  50. orchestrator = Orchestrator()
  51. assert orchestrator._orchestrator is None
  52. assert orchestrator._dispatcher is None
  53. assert _OrchestratorFactory._dispatcher is None
  54. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  55. orchestrator.run()
  56. assert orchestrator._orchestrator is not None
  57. assert orchestrator._orchestrator == _Orchestrator
  58. assert _OrchestratorFactory._orchestrator is not None
  59. assert _OrchestratorFactory._orchestrator == _Orchestrator
  60. assert orchestrator._dispatcher is not None
  61. assert isinstance(orchestrator._dispatcher, _StandaloneJobDispatcher)
  62. assert isinstance(_OrchestratorFactory._dispatcher, _StandaloneJobDispatcher)
  63. assert orchestrator._dispatcher.is_running()
  64. assert _OrchestratorFactory._dispatcher.is_running()
  65. orchestrator.stop()
  66. def test_orchestrator_service_can_only_be_run_once(self):
  67. orchestrator_instance_1 = Orchestrator()
  68. orchestrator_instance_2 = Orchestrator()
  69. orchestrator_instance_1.run()
  70. with pytest.raises(OrchestratorServiceIsAlreadyRunning):
  71. orchestrator_instance_1.run()
  72. with pytest.raises(OrchestratorServiceIsAlreadyRunning):
  73. orchestrator_instance_2.run()
  74. # Stop the Orchestrator service and run it again should work
  75. orchestrator_instance_1.stop()
  76. orchestrator_instance_1.run()
  77. orchestrator_instance_1.stop()
  78. orchestrator_instance_2.run()
  79. orchestrator_instance_2.stop()
  80. def test_block_config_update_when_orchestrator_service_is_running_development_mode(self):
  81. _OrchestratorFactory._dispatcher = None
  82. orchestrator = Orchestrator()
  83. orchestrator.run()
  84. with pytest.raises(ConfigurationUpdateBlocked):
  85. Config.configure_data_node(id="i1")
  86. orchestrator.stop()
  87. @pytest.mark.standalone
  88. def test_block_config_update_when_orchestrator_service_is_running_standalone_mode(self):
  89. _OrchestratorFactory._dispatcher = None
  90. orchestrator = Orchestrator()
  91. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  92. orchestrator.run()
  93. with pytest.raises(ConfigurationUpdateBlocked):
  94. Config.configure_data_node(id="i1")
  95. orchestrator.stop()