# Copyright 2021-2025 Avaiga Private Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. from unittest.mock import patch import pytest from taipy.common.config import Config from taipy.core import Orchestrator, taipy from taipy.core._version._version_manager import _VersionManager from taipy.core._version._version_manager_factory import _VersionManagerFactory from taipy.core.common._utils import _load_fct from taipy.core.common.frequency import Frequency from taipy.core.common.scope import Scope from taipy.core.cycle._cycle_manager import _CycleManager from taipy.core.data._data_manager import _DataManager from taipy.core.exceptions.exceptions import NonExistingVersion from taipy.core.job._job_manager import _JobManager from taipy.core.scenario._scenario_manager import _ScenarioManager from taipy.core.sequence._sequence_manager import _SequenceManager from taipy.core.task._task_manager import _TaskManager def test_orchestrator_cli_no_arguments(): with patch("sys.argv", ["prog"]): orchestrator = Orchestrator() orchestrator.run() assert Config.core.mode == "development" assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version() assert not Config.core.force orchestrator.stop() def test_orchestrator_cli_development_mode(): with patch("sys.argv", ["prog", "--development"]): orchestrator = Orchestrator() orchestrator.run() assert Config.core.mode == "development" assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version() orchestrator.stop() def test_orchestrator_cli_dev_mode(): with patch("sys.argv", ["prog", "-dev"]): orchestrator = Orchestrator() orchestrator.run() assert Config.core.mode == "development" assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version() orchestrator.stop() def test_orchestrator_cli_experiment_mode(): with patch("sys.argv", ["prog", "--experiment"]): orchestrator = Orchestrator() orchestrator.run() assert Config.core.mode == "experiment" assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_latest_version() assert not Config.core.force orchestrator.stop() def test_orchestrator_cli_experiment_mode_with_version(): with patch("sys.argv", ["prog", "--experiment", "2.1"]): orchestrator = Orchestrator() orchestrator.run() assert Config.core.mode == "experiment" assert Config.core.version_number == "2.1" assert not Config.core.force orchestrator.stop() def test_orchestrator_cli_experiment_mode_with_force_version(init_config): with patch("sys.argv", ["prog", "--experiment", "2.1", "--taipy-force"]): init_config() orchestrator = Orchestrator() orchestrator.run() assert Config.core.mode == "experiment" assert Config.core.version_number == "2.1" assert Config.core.force orchestrator.stop() def test_dev_mode_clean_all_entities_of_the_latest_version(): scenario_config = config_scenario() # Create a scenario in development mode with patch("sys.argv", ["prog"]): orchestrator = Orchestrator() orchestrator.run() scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) orchestrator.stop() # Initial assertion assert len(_DataManager._get_all(version_number="all")) == 2 assert len(_TaskManager._get_all(version_number="all")) == 1 assert len(_SequenceManager._get_all(version_number="all")) == 1 assert len(_ScenarioManager._get_all(version_number="all")) == 1 assert len(_CycleManager._get_all(version_number="all")) == 1 assert len(_JobManager._get_all(version_number="all")) == 1 # Create a new scenario in experiment mode with patch("sys.argv", ["prog", "--experiment"]): orchestrator = Orchestrator() orchestrator.run() scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) orchestrator.stop() # Assert number of entities in 2nd version assert len(_DataManager._get_all(version_number="all")) == 4 assert len(_TaskManager._get_all(version_number="all")) == 2 assert len(_SequenceManager._get_all(version_number="all")) == 2 assert len(_ScenarioManager._get_all(version_number="all")) == 2 # No new cycle is created since old dev version use the same cycle assert len(_CycleManager._get_all(version_number="all")) == 1 assert len(_JobManager._get_all(version_number="all")) == 2 # Run development mode again with patch("sys.argv", ["prog", "--development"]): orchestrator = Orchestrator() orchestrator.run() # The 1st dev version should be deleted run with development mode assert len(_DataManager._get_all(version_number="all")) == 2 assert len(_TaskManager._get_all(version_number="all")) == 1 assert len(_SequenceManager._get_all(version_number="all")) == 1 assert len(_ScenarioManager._get_all(version_number="all")) == 1 assert len(_CycleManager._get_all(version_number="all")) == 1 assert len(_JobManager._get_all(version_number="all")) == 1 # Submit new dev version scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) orchestrator.stop() # Assert number of entities with 1 dev version and 1 exp version assert len(_DataManager._get_all(version_number="all")) == 4 assert len(_TaskManager._get_all(version_number="all")) == 2 assert len(_SequenceManager._get_all(version_number="all")) == 2 assert len(_ScenarioManager._get_all(version_number="all")) == 2 assert len(_CycleManager._get_all(version_number="all")) == 1 assert len(_JobManager._get_all(version_number="all")) == 2 # Assert number of entities of the latest version only assert len(_DataManager._get_all(version_number="latest")) == 2 assert len(_TaskManager._get_all(version_number="latest")) == 1 assert len(_SequenceManager._get_all(version_number="latest")) == 1 assert len(_ScenarioManager._get_all(version_number="latest")) == 1 assert len(_JobManager._get_all(version_number="latest")) == 1 # Assert number of entities of the development version only assert len(_DataManager._get_all(version_number="development")) == 2 assert len(_TaskManager._get_all(version_number="development")) == 1 assert len(_SequenceManager._get_all(version_number="development")) == 1 assert len(_ScenarioManager._get_all(version_number="development")) == 1 assert len(_JobManager._get_all(version_number="development")) == 1 # Assert number of entities of an unknown version with pytest.raises(NonExistingVersion): assert _DataManager._get_all(version_number="foo") with pytest.raises(NonExistingVersion): assert _TaskManager._get_all(version_number="foo") with pytest.raises(NonExistingVersion): assert _SequenceManager._get_all(version_number="foo") with pytest.raises(NonExistingVersion): assert _ScenarioManager._get_all(version_number="foo") with pytest.raises(NonExistingVersion): assert _JobManager._get_all(version_number="foo") def twice_doppelganger(a): return a * 2 def test_dev_mode_clean_all_entities_when_config_is_alternated(): data_node_1_config = Config.configure_data_node( id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO ) data_node_2_config = Config.configure_data_node(id="d2", storage_type="csv", default_path="foo.csv") task_config = Config.configure_task("my_task", twice_doppelganger, data_node_1_config, data_node_2_config) scenario_config = Config.configure_scenario("my_scenario", [task_config], frequency=Frequency.DAILY) # Create a scenario in development mode with the doppelganger function with patch("sys.argv", ["prog"]): orchestrator = Orchestrator() orchestrator.run() scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) orchestrator.stop() # Delete the twice_doppelganger function # and clear cache of _load_fct() to simulate a new run del globals()["twice_doppelganger"] _load_fct.cache_clear() # Create a scenario in development mode with another function scenario_config = config_scenario() with patch("sys.argv", ["prog"]): orchestrator = Orchestrator() orchestrator.run() scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) orchestrator.stop() def test_version_number_when_switching_mode(): with patch("sys.argv", ["prog", "--development"]): orchestrator = Orchestrator() orchestrator.run() ver_1 = _VersionManager._get_latest_version() ver_dev = _VersionManager._get_development_version() assert ver_1 == ver_dev assert len(_VersionManager._get_all()) == 1 orchestrator.stop() # Run with dev mode, the version number is the same with patch("sys.argv", ["prog", "--development"]): orchestrator = Orchestrator() orchestrator.run() ver_2 = _VersionManager._get_latest_version() assert ver_2 == ver_dev assert len(_VersionManager._get_all()) == 1 orchestrator.stop() # When run with experiment mode, a new version is created with patch("sys.argv", ["prog", "--experiment"]): orchestrator = Orchestrator() orchestrator.run() ver_3 = _VersionManager._get_latest_version() assert ver_3 != ver_dev assert len(_VersionManager._get_all()) == 2 orchestrator.stop() with patch("sys.argv", ["prog", "--experiment", "2.1"]): orchestrator = Orchestrator() orchestrator.run() ver_4 = _VersionManager._get_latest_version() assert ver_4 == "2.1" assert len(_VersionManager._get_all()) == 3 orchestrator.stop() with patch("sys.argv", ["prog", "--experiment"]): orchestrator = Orchestrator() orchestrator.run() ver_5 = _VersionManager._get_latest_version() assert ver_5 != ver_3 assert ver_5 != ver_4 assert ver_5 != ver_dev assert len(_VersionManager._get_all()) == 4 orchestrator.stop() # Run with dev mode, the version number is the same as the first dev version to override it with patch("sys.argv", ["prog", "--development"]): orchestrator = Orchestrator() orchestrator.run() ver_7 = _VersionManager._get_latest_version() assert ver_1 == ver_7 assert len(_VersionManager._get_all()) == 4 orchestrator.stop() def test_force_override_experiment_version(): scenario_config = config_scenario() with patch("sys.argv", ["prog", "--experiment", "1.0"]): orchestrator = Orchestrator() orchestrator.run() ver_1 = _VersionManager._get_latest_version() assert ver_1 == "1.0" # When create new experiment version, a development version entity is also created as a placeholder assert len(_VersionManager._get_all()) == 2 # 2 version include 1 experiment 1 development scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) assert len(_DataManager._get_all()) == 2 assert len(_TaskManager._get_all()) == 1 assert len(_SequenceManager._get_all()) == 1 assert len(_ScenarioManager._get_all()) == 1 assert len(_CycleManager._get_all()) == 1 assert len(_JobManager._get_all()) == 1 orchestrator.stop() Config.configure_global_app(foo="bar") # Without --taipy-force parameter, a SystemExit will be raised with pytest.raises(SystemExit): with patch("sys.argv", ["prog", "--experiment", "1.0"]): orchestrator = Orchestrator() orchestrator.run() orchestrator.stop() # With --taipy-force parameter with patch("sys.argv", ["prog", "--experiment", "1.0", "--taipy-force"]): orchestrator = Orchestrator() orchestrator.run() ver_2 = _VersionManager._get_latest_version() assert ver_2 == "1.0" assert len(_VersionManager._get_all()) == 2 # 2 version include 1 experiment 1 development # All entities from previous submit should be saved scenario = _ScenarioManager._create(scenario_config) taipy.submit(scenario) assert len(_DataManager._get_all()) == 4 assert len(_TaskManager._get_all()) == 2 assert len(_SequenceManager._get_all()) == 2 assert len(_ScenarioManager._get_all()) == 2 assert len(_CycleManager._get_all()) == 1 assert len(_JobManager._get_all()) == 2 orchestrator.stop() def test_modified_job_configuration_dont_block_application_run(caplog, init_config): _ = config_scenario() with patch("sys.argv", ["prog", "--experiment", "1.0"]): orchestrator = Orchestrator() Config.configure_job_executions(mode="development") orchestrator.run() orchestrator.stop() init_config() _ = config_scenario() with patch("sys.argv", ["prog", "--experiment", "1.0"]): orchestrator = Orchestrator() Config.configure_job_executions(mode="standalone", max_nb_of_workers=3) orchestrator.run() error_message = str(caplog.text) assert 'JOB "mode" was modified' in error_message assert 'JOB "max_nb_of_workers" was added' in error_message orchestrator.stop() def test_modified_config_properties_without_force(caplog, init_config): _ = config_scenario() with patch("sys.argv", ["prog", "--experiment", "1.0"]): orchestrator = Orchestrator() orchestrator.run() orchestrator.stop() init_config() _ = config_scenario_2() with pytest.raises(SystemExit): with patch("sys.argv", ["prog", "--experiment", "1.0"]): orchestrator = Orchestrator() orchestrator.run() orchestrator.stop() error_message = str(caplog.text) assert 'DATA_NODE "d3" was added' in error_message assert 'JOB "max_nb_of_workers" was added' in error_message assert 'DATA_NODE "d0" was removed' in error_message assert 'DATA_NODE "d2" has attribute "default_path" modified' in error_message assert 'CORE "root_folder" was modified' in error_message assert 'CORE "repository_type" was modified' in error_message assert 'JOB "mode" was modified' in error_message assert 'SCENARIO "my_scenario" has attribute "frequency" modified' in error_message assert 'SCENARIO "my_scenario" has attribute "tasks" modified' in error_message assert 'TASK "my_task" has attribute "inputs" modified' in error_message assert 'TASK "my_task" has attribute "function" modified' in error_message assert 'TASK "my_task" has attribute "outputs" modified' in error_message assert 'DATA_NODE "d2" has attribute "has_header" modified' in error_message assert 'DATA_NODE "d2" has attribute "exposed_type" modified' in error_message assert 'CORE "repository_properties" was added' in error_message def twice(a): return a * 2 def config_scenario(): Config.configure_data_node(id="d0") data_node_1_config = Config.configure_data_node( id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO ) data_node_2_config = Config.configure_data_node(id="d2", storage_type="csv", default_path="foo.csv") task_config = Config.configure_task("my_task", twice, data_node_1_config, data_node_2_config) scenario_config = Config.configure_scenario("my_scenario", [task_config], frequency=Frequency.DAILY) scenario_config.add_sequences({"my_sequence": [task_config]}) return scenario_config def double_twice(a): return a * 2, a * 2 def config_scenario_2(): Config.configure_core( root_folder="foo_root", # Changing the "storage_folder" will fail since older versions are stored in older folder # storage_folder="foo_storage", repository_type="bar", repository_properties={"foo": "bar"}, ) Config.configure_job_executions(mode="standalone", max_nb_of_workers=3) data_node_1_config = Config.configure_data_node( id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO ) # Modify properties of "d2" data_node_2_config = Config.configure_data_node( id="d2", storage_type="csv", default_path="bar.csv", has_header=False, exposed_type="numpy" ) # Add new data node "d3" data_node_3_config = Config.configure_data_node( id="d3", storage_type="csv", default_path="baz.csv", has_header=False, exposed_type="numpy" ) # Modify properties of "my_task", including the function and outputs list Config.configure_task("my_task", double_twice, data_node_3_config, [data_node_1_config, data_node_2_config]) task_config_1 = Config.configure_task("my_task_1", double_twice, data_node_3_config, [data_node_2_config]) # Modify properties of "my_scenario", where tasks is now my_task_1 scenario_config = Config.configure_scenario("my_scenario", [task_config_1], frequency=Frequency.MONTHLY) scenario_config.add_sequences({"my_sequence": [task_config_1]}) return scenario_config