123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from unittest.mock import patch
- import pytest
- from taipy.config.common.frequency import Frequency
- from taipy.config.common.scope import Scope
- from taipy.config.config import Config
- from taipy.core import Orchestrator, taipy
- from taipy.core._version._version_manager import _VersionManager
- from taipy.core._version._version_manager_factory import _VersionManagerFactory
- from taipy.core.common._utils import _load_fct
- from taipy.core.cycle._cycle_manager import _CycleManager
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.exceptions.exceptions import NonExistingVersion
- from taipy.core.job._job_manager import _JobManager
- from taipy.core.scenario._scenario_manager import _ScenarioManager
- from taipy.core.sequence._sequence_manager import _SequenceManager
- from taipy.core.task._task_manager import _TaskManager
- def test_orchestrator_cli_no_arguments():
- with patch("sys.argv", ["prog"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- assert Config.core.mode == "development"
- assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version()
- assert not Config.core.force
- orchestrator.stop()
- def test_orchestrator_cli_development_mode():
- with patch("sys.argv", ["prog", "--development"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- assert Config.core.mode == "development"
- assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version()
- orchestrator.stop()
- def test_orchestrator_cli_dev_mode():
- with patch("sys.argv", ["prog", "-dev"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- assert Config.core.mode == "development"
- assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version()
- orchestrator.stop()
- def test_orchestrator_cli_experiment_mode():
- with patch("sys.argv", ["prog", "--experiment"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- assert Config.core.mode == "experiment"
- assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_latest_version()
- assert not Config.core.force
- orchestrator.stop()
- def test_orchestrator_cli_experiment_mode_with_version():
- with patch("sys.argv", ["prog", "--experiment", "2.1"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- assert Config.core.mode == "experiment"
- assert Config.core.version_number == "2.1"
- assert not Config.core.force
- orchestrator.stop()
- def test_orchestrator_cli_experiment_mode_with_force_version(init_config):
- with patch("sys.argv", ["prog", "--experiment", "2.1", "--taipy-force"]):
- init_config()
- orchestrator = Orchestrator()
- orchestrator.run()
- assert Config.core.mode == "experiment"
- assert Config.core.version_number == "2.1"
- assert Config.core.force
- orchestrator.stop()
- def test_dev_mode_clean_all_entities_of_the_latest_version():
- scenario_config = config_scenario()
- # Create a scenario in development mode
- with patch("sys.argv", ["prog"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- orchestrator.stop()
- # Initial assertion
- assert len(_DataManager._get_all(version_number="all")) == 2
- assert len(_TaskManager._get_all(version_number="all")) == 1
- assert len(_SequenceManager._get_all(version_number="all")) == 1
- assert len(_ScenarioManager._get_all(version_number="all")) == 1
- assert len(_CycleManager._get_all(version_number="all")) == 1
- assert len(_JobManager._get_all(version_number="all")) == 1
- # Create a new scenario in experiment mode
- with patch("sys.argv", ["prog", "--experiment"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- orchestrator.stop()
- # Assert number of entities in 2nd version
- assert len(_DataManager._get_all(version_number="all")) == 4
- assert len(_TaskManager._get_all(version_number="all")) == 2
- assert len(_SequenceManager._get_all(version_number="all")) == 2
- assert len(_ScenarioManager._get_all(version_number="all")) == 2
- # No new cycle is created since old dev version use the same cycle
- assert len(_CycleManager._get_all(version_number="all")) == 1
- assert len(_JobManager._get_all(version_number="all")) == 2
- # Run development mode again
- with patch("sys.argv", ["prog", "--development"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- # The 1st dev version should be deleted run with development mode
- assert len(_DataManager._get_all(version_number="all")) == 2
- assert len(_TaskManager._get_all(version_number="all")) == 1
- assert len(_SequenceManager._get_all(version_number="all")) == 1
- assert len(_ScenarioManager._get_all(version_number="all")) == 1
- assert len(_CycleManager._get_all(version_number="all")) == 1
- assert len(_JobManager._get_all(version_number="all")) == 1
- # Submit new dev version
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- orchestrator.stop()
- # Assert number of entities with 1 dev version and 1 exp version
- assert len(_DataManager._get_all(version_number="all")) == 4
- assert len(_TaskManager._get_all(version_number="all")) == 2
- assert len(_SequenceManager._get_all(version_number="all")) == 2
- assert len(_ScenarioManager._get_all(version_number="all")) == 2
- assert len(_CycleManager._get_all(version_number="all")) == 1
- assert len(_JobManager._get_all(version_number="all")) == 2
- # Assert number of entities of the latest version only
- assert len(_DataManager._get_all(version_number="latest")) == 2
- assert len(_TaskManager._get_all(version_number="latest")) == 1
- assert len(_SequenceManager._get_all(version_number="latest")) == 1
- assert len(_ScenarioManager._get_all(version_number="latest")) == 1
- assert len(_JobManager._get_all(version_number="latest")) == 1
- # Assert number of entities of the development version only
- assert len(_DataManager._get_all(version_number="development")) == 2
- assert len(_TaskManager._get_all(version_number="development")) == 1
- assert len(_SequenceManager._get_all(version_number="development")) == 1
- assert len(_ScenarioManager._get_all(version_number="development")) == 1
- assert len(_JobManager._get_all(version_number="development")) == 1
- # Assert number of entities of an unknown version
- with pytest.raises(NonExistingVersion):
- assert _DataManager._get_all(version_number="foo")
- with pytest.raises(NonExistingVersion):
- assert _TaskManager._get_all(version_number="foo")
- with pytest.raises(NonExistingVersion):
- assert _SequenceManager._get_all(version_number="foo")
- with pytest.raises(NonExistingVersion):
- assert _ScenarioManager._get_all(version_number="foo")
- with pytest.raises(NonExistingVersion):
- assert _JobManager._get_all(version_number="foo")
- def twice_doppelganger(a):
- return a * 2
- def test_dev_mode_clean_all_entities_when_config_is_alternated():
- data_node_1_config = Config.configure_data_node(
- id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO
- )
- data_node_2_config = Config.configure_data_node(id="d2", storage_type="csv", default_path="foo.csv")
- task_config = Config.configure_task("my_task", twice_doppelganger, data_node_1_config, data_node_2_config)
- scenario_config = Config.configure_scenario("my_scenario", [task_config], frequency=Frequency.DAILY)
- # Create a scenario in development mode with the doppelganger function
- with patch("sys.argv", ["prog"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- orchestrator.stop()
- # Delete the twice_doppelganger function
- # and clear cache of _load_fct() to simulate a new run
- del globals()["twice_doppelganger"]
- _load_fct.cache_clear()
- # Create a scenario in development mode with another function
- scenario_config = config_scenario()
- with patch("sys.argv", ["prog"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- orchestrator.stop()
- def test_version_number_when_switching_mode():
- with patch("sys.argv", ["prog", "--development"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_1 = _VersionManager._get_latest_version()
- ver_dev = _VersionManager._get_development_version()
- assert ver_1 == ver_dev
- assert len(_VersionManager._get_all()) == 1
- orchestrator.stop()
- # Run with dev mode, the version number is the same
- with patch("sys.argv", ["prog", "--development"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_2 = _VersionManager._get_latest_version()
- assert ver_2 == ver_dev
- assert len(_VersionManager._get_all()) == 1
- orchestrator.stop()
- # When run with experiment mode, a new version is created
- with patch("sys.argv", ["prog", "--experiment"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_3 = _VersionManager._get_latest_version()
- assert ver_3 != ver_dev
- assert len(_VersionManager._get_all()) == 2
- orchestrator.stop()
- with patch("sys.argv", ["prog", "--experiment", "2.1"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_4 = _VersionManager._get_latest_version()
- assert ver_4 == "2.1"
- assert len(_VersionManager._get_all()) == 3
- orchestrator.stop()
- with patch("sys.argv", ["prog", "--experiment"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_5 = _VersionManager._get_latest_version()
- assert ver_5 != ver_3
- assert ver_5 != ver_4
- assert ver_5 != ver_dev
- assert len(_VersionManager._get_all()) == 4
- orchestrator.stop()
- # Run with dev mode, the version number is the same as the first dev version to override it
- with patch("sys.argv", ["prog", "--development"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_7 = _VersionManager._get_latest_version()
- assert ver_1 == ver_7
- assert len(_VersionManager._get_all()) == 4
- orchestrator.stop()
- def test_force_override_experiment_version():
- scenario_config = config_scenario()
- with patch("sys.argv", ["prog", "--experiment", "1.0"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_1 = _VersionManager._get_latest_version()
- assert ver_1 == "1.0"
- # When create new experiment version, a development version entity is also created as a placeholder
- assert len(_VersionManager._get_all()) == 2 # 2 version include 1 experiment 1 development
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- assert len(_DataManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 1
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_CycleManager._get_all()) == 1
- assert len(_JobManager._get_all()) == 1
- orchestrator.stop()
- Config.configure_global_app(foo="bar")
- # Without --taipy-force parameter, a SystemExit will be raised
- with pytest.raises(SystemExit):
- with patch("sys.argv", ["prog", "--experiment", "1.0"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- orchestrator.stop()
- # With --taipy-force parameter
- with patch("sys.argv", ["prog", "--experiment", "1.0", "--taipy-force"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- ver_2 = _VersionManager._get_latest_version()
- assert ver_2 == "1.0"
- assert len(_VersionManager._get_all()) == 2 # 2 version include 1 experiment 1 development
- # All entities from previous submit should be saved
- scenario = _ScenarioManager._create(scenario_config)
- taipy.submit(scenario)
- assert len(_DataManager._get_all()) == 4
- assert len(_TaskManager._get_all()) == 2
- assert len(_SequenceManager._get_all()) == 2
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_CycleManager._get_all()) == 1
- assert len(_JobManager._get_all()) == 2
- orchestrator.stop()
- def test_modified_job_configuration_dont_block_application_run(caplog, init_config):
- _ = config_scenario()
- with patch("sys.argv", ["prog", "--experiment", "1.0"]):
- orchestrator = Orchestrator()
- Config.configure_job_executions(mode="development")
- orchestrator.run()
- orchestrator.stop()
- init_config()
- _ = config_scenario()
- with patch("sys.argv", ["prog", "--experiment", "1.0"]):
- orchestrator = Orchestrator()
- Config.configure_job_executions(mode="standalone", max_nb_of_workers=3)
- orchestrator.run()
- error_message = str(caplog.text)
- assert 'JOB "mode" was modified' in error_message
- assert 'JOB "max_nb_of_workers" was added' in error_message
- orchestrator.stop()
- def test_modified_config_properties_without_force(caplog, init_config):
- _ = config_scenario()
- with patch("sys.argv", ["prog", "--experiment", "1.0"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- orchestrator.stop()
- init_config()
- _ = config_scenario_2()
- with pytest.raises(SystemExit):
- with patch("sys.argv", ["prog", "--experiment", "1.0"]):
- orchestrator = Orchestrator()
- orchestrator.run()
- orchestrator.stop()
- error_message = str(caplog.text)
- assert 'DATA_NODE "d3" was added' in error_message
- assert 'JOB "max_nb_of_workers" was added' in error_message
- assert 'DATA_NODE "d0" was removed' in error_message
- assert 'DATA_NODE "d2" has attribute "default_path" modified' in error_message
- assert 'CORE "root_folder" was modified' in error_message
- assert 'CORE "repository_type" was modified' in error_message
- assert 'JOB "mode" was modified' in error_message
- assert 'SCENARIO "my_scenario" has attribute "frequency" modified' in error_message
- assert 'SCENARIO "my_scenario" has attribute "tasks" modified' in error_message
- assert 'TASK "my_task" has attribute "inputs" modified' in error_message
- assert 'TASK "my_task" has attribute "function" modified' in error_message
- assert 'TASK "my_task" has attribute "outputs" modified' in error_message
- assert 'DATA_NODE "d2" has attribute "has_header" modified' in error_message
- assert 'DATA_NODE "d2" has attribute "exposed_type" modified' in error_message
- assert 'CORE "repository_properties" was added' in error_message
- def twice(a):
- return a * 2
- def config_scenario():
- Config.configure_data_node(id="d0")
- data_node_1_config = Config.configure_data_node(
- id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO
- )
- data_node_2_config = Config.configure_data_node(id="d2", storage_type="csv", default_path="foo.csv")
- task_config = Config.configure_task("my_task", twice, data_node_1_config, data_node_2_config)
- scenario_config = Config.configure_scenario("my_scenario", [task_config], frequency=Frequency.DAILY)
- scenario_config.add_sequences({"my_sequence": [task_config]})
- return scenario_config
- def double_twice(a):
- return a * 2, a * 2
- def config_scenario_2():
- Config.configure_core(
- root_folder="foo_root",
- # Changing the "storage_folder" will fail since older versions are stored in older folder
- # storage_folder="foo_storage",
- repository_type="bar",
- repository_properties={"foo": "bar"},
- )
- Config.configure_job_executions(mode="standalone", max_nb_of_workers=3)
- data_node_1_config = Config.configure_data_node(
- id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO
- )
- # Modify properties of "d2"
- data_node_2_config = Config.configure_data_node(
- id="d2", storage_type="csv", default_path="bar.csv", has_header=False, exposed_type="numpy"
- )
- # Add new data node "d3"
- data_node_3_config = Config.configure_data_node(
- id="d3", storage_type="csv", default_path="baz.csv", has_header=False, exposed_type="numpy"
- )
- # Modify properties of "my_task", including the function and outputs list
- Config.configure_task("my_task", double_twice, data_node_3_config, [data_node_1_config, data_node_2_config])
- task_config_1 = Config.configure_task("my_task_1", double_twice, data_node_3_config, [data_node_2_config])
- # Modify properties of "my_scenario", where tasks is now my_task_1
- scenario_config = Config.configure_scenario("my_scenario", [task_config_1], frequency=Frequency.MONTHLY)
- scenario_config.add_sequences({"my_sequence": [task_config_1]})
- return scenario_config
|