123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import pytest
- from taipy.config import Config
- from taipy.config.exceptions.exceptions import ConfigurationUpdateBlocked
- from taipy.core import Core
- from taipy.core._orchestrator._dispatcher import _DevelopmentJobDispatcher, _StandaloneJobDispatcher
- from taipy.core._orchestrator._orchestrator import _Orchestrator
- from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
- from taipy.core.config.job_config import JobConfig
- from taipy.core.exceptions.exceptions import CoreServiceIsAlreadyRunning
- class TestCore:
- def test_run_core_trigger_config_check(self, caplog):
- Config.configure_data_node(id="d0", storage_type="toto")
- with pytest.raises(SystemExit):
- core = Core()
- core.run()
- expected_error_message = (
- "`storage_type` field of DataNodeConfig `d0` must be either csv, sql_table,"
- " sql, mongo_collection, pickle, excel, generic, json, parquet, s3_object, or in_memory."
- ' Current value of property `storage_type` is "toto".'
- )
- assert expected_error_message in caplog.text
- core.stop()
- def test_run_core_as_a_service_development_mode(self):
- _OrchestratorFactory._dispatcher = None
- core = Core()
- assert core._orchestrator is None
- assert core._dispatcher is None
- assert _OrchestratorFactory._dispatcher is None
- core.run()
- assert core._orchestrator is not None
- assert core._orchestrator == _Orchestrator
- assert _OrchestratorFactory._orchestrator is not None
- assert _OrchestratorFactory._orchestrator == _Orchestrator
- assert core._dispatcher is not None
- assert isinstance(core._dispatcher, _DevelopmentJobDispatcher)
- assert isinstance(_OrchestratorFactory._dispatcher, _DevelopmentJobDispatcher)
- core.stop()
- def test_run_core_as_a_service_standalone_mode(self):
- _OrchestratorFactory._dispatcher = None
- core = Core()
- assert core._orchestrator is None
- assert core._dispatcher is None
- assert _OrchestratorFactory._dispatcher is None
- Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
- core.run()
- assert core._orchestrator is not None
- assert core._orchestrator == _Orchestrator
- assert _OrchestratorFactory._orchestrator is not None
- assert _OrchestratorFactory._orchestrator == _Orchestrator
- assert core._dispatcher is not None
- assert isinstance(core._dispatcher, _StandaloneJobDispatcher)
- assert isinstance(_OrchestratorFactory._dispatcher, _StandaloneJobDispatcher)
- assert core._dispatcher.is_running()
- assert _OrchestratorFactory._dispatcher.is_running()
- core.stop()
- def test_core_service_can_only_be_run_once(self):
- core_instance_1 = Core()
- core_instance_2 = Core()
- core_instance_1.run()
- with pytest.raises(CoreServiceIsAlreadyRunning):
- core_instance_1.run()
- with pytest.raises(CoreServiceIsAlreadyRunning):
- core_instance_2.run()
- # Stop the Core service and run it again should work
- core_instance_1.stop()
- core_instance_1.run()
- core_instance_1.stop()
- core_instance_2.run()
- core_instance_2.stop()
- def test_block_config_update_when_core_service_is_running_development_mode(self):
- _OrchestratorFactory._dispatcher = None
- core = Core()
- core.run()
- with pytest.raises(ConfigurationUpdateBlocked):
- Config.configure_data_node(id="i1")
- core.stop()
- @pytest.mark.standalone
- def test_block_config_update_when_core_service_is_running_standalone_mode(self):
- _OrchestratorFactory._dispatcher = None
- core = Core()
- Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
- core.run()
- with pytest.raises(ConfigurationUpdateBlocked):
- Config.configure_data_node(id="i1")
- core.stop()
|