test_core_cli.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from unittest.mock import patch
  12. import pytest
  13. from taipy.common.config import Config
  14. from taipy.common.config.common.frequency import Frequency
  15. from taipy.common.config.common.scope import Scope
  16. from taipy.core import Orchestrator, taipy
  17. from taipy.core._version._version_manager import _VersionManager
  18. from taipy.core._version._version_manager_factory import _VersionManagerFactory
  19. from taipy.core.common._utils import _load_fct
  20. from taipy.core.cycle._cycle_manager import _CycleManager
  21. from taipy.core.data._data_manager import _DataManager
  22. from taipy.core.exceptions.exceptions import NonExistingVersion
  23. from taipy.core.job._job_manager import _JobManager
  24. from taipy.core.scenario._scenario_manager import _ScenarioManager
  25. from taipy.core.sequence._sequence_manager import _SequenceManager
  26. from taipy.core.task._task_manager import _TaskManager
  27. def test_orchestrator_cli_no_arguments():
  28. with patch("sys.argv", ["prog"]):
  29. orchestrator = Orchestrator()
  30. orchestrator.run()
  31. assert Config.core.mode == "development"
  32. assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version()
  33. assert not Config.core.force
  34. orchestrator.stop()
  35. def test_orchestrator_cli_development_mode():
  36. with patch("sys.argv", ["prog", "--development"]):
  37. orchestrator = Orchestrator()
  38. orchestrator.run()
  39. assert Config.core.mode == "development"
  40. assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version()
  41. orchestrator.stop()
  42. def test_orchestrator_cli_dev_mode():
  43. with patch("sys.argv", ["prog", "-dev"]):
  44. orchestrator = Orchestrator()
  45. orchestrator.run()
  46. assert Config.core.mode == "development"
  47. assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_development_version()
  48. orchestrator.stop()
  49. def test_orchestrator_cli_experiment_mode():
  50. with patch("sys.argv", ["prog", "--experiment"]):
  51. orchestrator = Orchestrator()
  52. orchestrator.run()
  53. assert Config.core.mode == "experiment"
  54. assert Config.core.version_number == _VersionManagerFactory._build_manager()._get_latest_version()
  55. assert not Config.core.force
  56. orchestrator.stop()
  57. def test_orchestrator_cli_experiment_mode_with_version():
  58. with patch("sys.argv", ["prog", "--experiment", "2.1"]):
  59. orchestrator = Orchestrator()
  60. orchestrator.run()
  61. assert Config.core.mode == "experiment"
  62. assert Config.core.version_number == "2.1"
  63. assert not Config.core.force
  64. orchestrator.stop()
  65. def test_orchestrator_cli_experiment_mode_with_force_version(init_config):
  66. with patch("sys.argv", ["prog", "--experiment", "2.1", "--taipy-force"]):
  67. init_config()
  68. orchestrator = Orchestrator()
  69. orchestrator.run()
  70. assert Config.core.mode == "experiment"
  71. assert Config.core.version_number == "2.1"
  72. assert Config.core.force
  73. orchestrator.stop()
  74. def test_dev_mode_clean_all_entities_of_the_latest_version():
  75. scenario_config = config_scenario()
  76. # Create a scenario in development mode
  77. with patch("sys.argv", ["prog"]):
  78. orchestrator = Orchestrator()
  79. orchestrator.run()
  80. scenario = _ScenarioManager._create(scenario_config)
  81. taipy.submit(scenario)
  82. orchestrator.stop()
  83. # Initial assertion
  84. assert len(_DataManager._get_all(version_number="all")) == 2
  85. assert len(_TaskManager._get_all(version_number="all")) == 1
  86. assert len(_SequenceManager._get_all(version_number="all")) == 1
  87. assert len(_ScenarioManager._get_all(version_number="all")) == 1
  88. assert len(_CycleManager._get_all(version_number="all")) == 1
  89. assert len(_JobManager._get_all(version_number="all")) == 1
  90. # Create a new scenario in experiment mode
  91. with patch("sys.argv", ["prog", "--experiment"]):
  92. orchestrator = Orchestrator()
  93. orchestrator.run()
  94. scenario = _ScenarioManager._create(scenario_config)
  95. taipy.submit(scenario)
  96. orchestrator.stop()
  97. # Assert number of entities in 2nd version
  98. assert len(_DataManager._get_all(version_number="all")) == 4
  99. assert len(_TaskManager._get_all(version_number="all")) == 2
  100. assert len(_SequenceManager._get_all(version_number="all")) == 2
  101. assert len(_ScenarioManager._get_all(version_number="all")) == 2
  102. # No new cycle is created since old dev version use the same cycle
  103. assert len(_CycleManager._get_all(version_number="all")) == 1
  104. assert len(_JobManager._get_all(version_number="all")) == 2
  105. # Run development mode again
  106. with patch("sys.argv", ["prog", "--development"]):
  107. orchestrator = Orchestrator()
  108. orchestrator.run()
  109. # The 1st dev version should be deleted run with development mode
  110. assert len(_DataManager._get_all(version_number="all")) == 2
  111. assert len(_TaskManager._get_all(version_number="all")) == 1
  112. assert len(_SequenceManager._get_all(version_number="all")) == 1
  113. assert len(_ScenarioManager._get_all(version_number="all")) == 1
  114. assert len(_CycleManager._get_all(version_number="all")) == 1
  115. assert len(_JobManager._get_all(version_number="all")) == 1
  116. # Submit new dev version
  117. scenario = _ScenarioManager._create(scenario_config)
  118. taipy.submit(scenario)
  119. orchestrator.stop()
  120. # Assert number of entities with 1 dev version and 1 exp version
  121. assert len(_DataManager._get_all(version_number="all")) == 4
  122. assert len(_TaskManager._get_all(version_number="all")) == 2
  123. assert len(_SequenceManager._get_all(version_number="all")) == 2
  124. assert len(_ScenarioManager._get_all(version_number="all")) == 2
  125. assert len(_CycleManager._get_all(version_number="all")) == 1
  126. assert len(_JobManager._get_all(version_number="all")) == 2
  127. # Assert number of entities of the latest version only
  128. assert len(_DataManager._get_all(version_number="latest")) == 2
  129. assert len(_TaskManager._get_all(version_number="latest")) == 1
  130. assert len(_SequenceManager._get_all(version_number="latest")) == 1
  131. assert len(_ScenarioManager._get_all(version_number="latest")) == 1
  132. assert len(_JobManager._get_all(version_number="latest")) == 1
  133. # Assert number of entities of the development version only
  134. assert len(_DataManager._get_all(version_number="development")) == 2
  135. assert len(_TaskManager._get_all(version_number="development")) == 1
  136. assert len(_SequenceManager._get_all(version_number="development")) == 1
  137. assert len(_ScenarioManager._get_all(version_number="development")) == 1
  138. assert len(_JobManager._get_all(version_number="development")) == 1
  139. # Assert number of entities of an unknown version
  140. with pytest.raises(NonExistingVersion):
  141. assert _DataManager._get_all(version_number="foo")
  142. with pytest.raises(NonExistingVersion):
  143. assert _TaskManager._get_all(version_number="foo")
  144. with pytest.raises(NonExistingVersion):
  145. assert _SequenceManager._get_all(version_number="foo")
  146. with pytest.raises(NonExistingVersion):
  147. assert _ScenarioManager._get_all(version_number="foo")
  148. with pytest.raises(NonExistingVersion):
  149. assert _JobManager._get_all(version_number="foo")
  150. def twice_doppelganger(a):
  151. return a * 2
  152. def test_dev_mode_clean_all_entities_when_config_is_alternated():
  153. data_node_1_config = Config.configure_data_node(
  154. id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO
  155. )
  156. data_node_2_config = Config.configure_data_node(id="d2", storage_type="csv", default_path="foo.csv")
  157. task_config = Config.configure_task("my_task", twice_doppelganger, data_node_1_config, data_node_2_config)
  158. scenario_config = Config.configure_scenario("my_scenario", [task_config], frequency=Frequency.DAILY)
  159. # Create a scenario in development mode with the doppelganger function
  160. with patch("sys.argv", ["prog"]):
  161. orchestrator = Orchestrator()
  162. orchestrator.run()
  163. scenario = _ScenarioManager._create(scenario_config)
  164. taipy.submit(scenario)
  165. orchestrator.stop()
  166. # Delete the twice_doppelganger function
  167. # and clear cache of _load_fct() to simulate a new run
  168. del globals()["twice_doppelganger"]
  169. _load_fct.cache_clear()
  170. # Create a scenario in development mode with another function
  171. scenario_config = config_scenario()
  172. with patch("sys.argv", ["prog"]):
  173. orchestrator = Orchestrator()
  174. orchestrator.run()
  175. scenario = _ScenarioManager._create(scenario_config)
  176. taipy.submit(scenario)
  177. orchestrator.stop()
  178. def test_version_number_when_switching_mode():
  179. with patch("sys.argv", ["prog", "--development"]):
  180. orchestrator = Orchestrator()
  181. orchestrator.run()
  182. ver_1 = _VersionManager._get_latest_version()
  183. ver_dev = _VersionManager._get_development_version()
  184. assert ver_1 == ver_dev
  185. assert len(_VersionManager._get_all()) == 1
  186. orchestrator.stop()
  187. # Run with dev mode, the version number is the same
  188. with patch("sys.argv", ["prog", "--development"]):
  189. orchestrator = Orchestrator()
  190. orchestrator.run()
  191. ver_2 = _VersionManager._get_latest_version()
  192. assert ver_2 == ver_dev
  193. assert len(_VersionManager._get_all()) == 1
  194. orchestrator.stop()
  195. # When run with experiment mode, a new version is created
  196. with patch("sys.argv", ["prog", "--experiment"]):
  197. orchestrator = Orchestrator()
  198. orchestrator.run()
  199. ver_3 = _VersionManager._get_latest_version()
  200. assert ver_3 != ver_dev
  201. assert len(_VersionManager._get_all()) == 2
  202. orchestrator.stop()
  203. with patch("sys.argv", ["prog", "--experiment", "2.1"]):
  204. orchestrator = Orchestrator()
  205. orchestrator.run()
  206. ver_4 = _VersionManager._get_latest_version()
  207. assert ver_4 == "2.1"
  208. assert len(_VersionManager._get_all()) == 3
  209. orchestrator.stop()
  210. with patch("sys.argv", ["prog", "--experiment"]):
  211. orchestrator = Orchestrator()
  212. orchestrator.run()
  213. ver_5 = _VersionManager._get_latest_version()
  214. assert ver_5 != ver_3
  215. assert ver_5 != ver_4
  216. assert ver_5 != ver_dev
  217. assert len(_VersionManager._get_all()) == 4
  218. orchestrator.stop()
  219. # Run with dev mode, the version number is the same as the first dev version to override it
  220. with patch("sys.argv", ["prog", "--development"]):
  221. orchestrator = Orchestrator()
  222. orchestrator.run()
  223. ver_7 = _VersionManager._get_latest_version()
  224. assert ver_1 == ver_7
  225. assert len(_VersionManager._get_all()) == 4
  226. orchestrator.stop()
  227. def test_force_override_experiment_version():
  228. scenario_config = config_scenario()
  229. with patch("sys.argv", ["prog", "--experiment", "1.0"]):
  230. orchestrator = Orchestrator()
  231. orchestrator.run()
  232. ver_1 = _VersionManager._get_latest_version()
  233. assert ver_1 == "1.0"
  234. # When create new experiment version, a development version entity is also created as a placeholder
  235. assert len(_VersionManager._get_all()) == 2 # 2 version include 1 experiment 1 development
  236. scenario = _ScenarioManager._create(scenario_config)
  237. taipy.submit(scenario)
  238. assert len(_DataManager._get_all()) == 2
  239. assert len(_TaskManager._get_all()) == 1
  240. assert len(_SequenceManager._get_all()) == 1
  241. assert len(_ScenarioManager._get_all()) == 1
  242. assert len(_CycleManager._get_all()) == 1
  243. assert len(_JobManager._get_all()) == 1
  244. orchestrator.stop()
  245. Config.configure_global_app(foo="bar")
  246. # Without --taipy-force parameter, a SystemExit will be raised
  247. with pytest.raises(SystemExit):
  248. with patch("sys.argv", ["prog", "--experiment", "1.0"]):
  249. orchestrator = Orchestrator()
  250. orchestrator.run()
  251. orchestrator.stop()
  252. # With --taipy-force parameter
  253. with patch("sys.argv", ["prog", "--experiment", "1.0", "--taipy-force"]):
  254. orchestrator = Orchestrator()
  255. orchestrator.run()
  256. ver_2 = _VersionManager._get_latest_version()
  257. assert ver_2 == "1.0"
  258. assert len(_VersionManager._get_all()) == 2 # 2 version include 1 experiment 1 development
  259. # All entities from previous submit should be saved
  260. scenario = _ScenarioManager._create(scenario_config)
  261. taipy.submit(scenario)
  262. assert len(_DataManager._get_all()) == 4
  263. assert len(_TaskManager._get_all()) == 2
  264. assert len(_SequenceManager._get_all()) == 2
  265. assert len(_ScenarioManager._get_all()) == 2
  266. assert len(_CycleManager._get_all()) == 1
  267. assert len(_JobManager._get_all()) == 2
  268. orchestrator.stop()
  269. def test_modified_job_configuration_dont_block_application_run(caplog, init_config):
  270. _ = config_scenario()
  271. with patch("sys.argv", ["prog", "--experiment", "1.0"]):
  272. orchestrator = Orchestrator()
  273. Config.configure_job_executions(mode="development")
  274. orchestrator.run()
  275. orchestrator.stop()
  276. init_config()
  277. _ = config_scenario()
  278. with patch("sys.argv", ["prog", "--experiment", "1.0"]):
  279. orchestrator = Orchestrator()
  280. Config.configure_job_executions(mode="standalone", max_nb_of_workers=3)
  281. orchestrator.run()
  282. error_message = str(caplog.text)
  283. assert 'JOB "mode" was modified' in error_message
  284. assert 'JOB "max_nb_of_workers" was added' in error_message
  285. orchestrator.stop()
  286. def test_modified_config_properties_without_force(caplog, init_config):
  287. _ = config_scenario()
  288. with patch("sys.argv", ["prog", "--experiment", "1.0"]):
  289. orchestrator = Orchestrator()
  290. orchestrator.run()
  291. orchestrator.stop()
  292. init_config()
  293. _ = config_scenario_2()
  294. with pytest.raises(SystemExit):
  295. with patch("sys.argv", ["prog", "--experiment", "1.0"]):
  296. orchestrator = Orchestrator()
  297. orchestrator.run()
  298. orchestrator.stop()
  299. error_message = str(caplog.text)
  300. assert 'DATA_NODE "d3" was added' in error_message
  301. assert 'JOB "max_nb_of_workers" was added' in error_message
  302. assert 'DATA_NODE "d0" was removed' in error_message
  303. assert 'DATA_NODE "d2" has attribute "default_path" modified' in error_message
  304. assert 'CORE "root_folder" was modified' in error_message
  305. assert 'CORE "repository_type" was modified' in error_message
  306. assert 'JOB "mode" was modified' in error_message
  307. assert 'SCENARIO "my_scenario" has attribute "frequency" modified' in error_message
  308. assert 'SCENARIO "my_scenario" has attribute "tasks" modified' in error_message
  309. assert 'TASK "my_task" has attribute "inputs" modified' in error_message
  310. assert 'TASK "my_task" has attribute "function" modified' in error_message
  311. assert 'TASK "my_task" has attribute "outputs" modified' in error_message
  312. assert 'DATA_NODE "d2" has attribute "has_header" modified' in error_message
  313. assert 'DATA_NODE "d2" has attribute "exposed_type" modified' in error_message
  314. assert 'CORE "repository_properties" was added' in error_message
  315. def twice(a):
  316. return a * 2
  317. def config_scenario():
  318. Config.configure_data_node(id="d0")
  319. data_node_1_config = Config.configure_data_node(
  320. id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO
  321. )
  322. data_node_2_config = Config.configure_data_node(id="d2", storage_type="csv", default_path="foo.csv")
  323. task_config = Config.configure_task("my_task", twice, data_node_1_config, data_node_2_config)
  324. scenario_config = Config.configure_scenario("my_scenario", [task_config], frequency=Frequency.DAILY)
  325. scenario_config.add_sequences({"my_sequence": [task_config]})
  326. return scenario_config
  327. def double_twice(a):
  328. return a * 2, a * 2
  329. def config_scenario_2():
  330. Config.configure_core(
  331. root_folder="foo_root",
  332. # Changing the "storage_folder" will fail since older versions are stored in older folder
  333. # storage_folder="foo_storage",
  334. repository_type="bar",
  335. repository_properties={"foo": "bar"},
  336. )
  337. Config.configure_job_executions(mode="standalone", max_nb_of_workers=3)
  338. data_node_1_config = Config.configure_data_node(
  339. id="d1", storage_type="pickle", default_data="abc", scope=Scope.SCENARIO
  340. )
  341. # Modify properties of "d2"
  342. data_node_2_config = Config.configure_data_node(
  343. id="d2", storage_type="csv", default_path="bar.csv", has_header=False, exposed_type="numpy"
  344. )
  345. # Add new data node "d3"
  346. data_node_3_config = Config.configure_data_node(
  347. id="d3", storage_type="csv", default_path="baz.csv", has_header=False, exposed_type="numpy"
  348. )
  349. # Modify properties of "my_task", including the function and outputs list
  350. Config.configure_task("my_task", double_twice, data_node_3_config, [data_node_1_config, data_node_2_config])
  351. task_config_1 = Config.configure_task("my_task_1", double_twice, data_node_3_config, [data_node_2_config])
  352. # Modify properties of "my_scenario", where tasks is now my_task_1
  353. scenario_config = Config.configure_scenario("my_scenario", [task_config_1], frequency=Frequency.MONTHLY)
  354. scenario_config.add_sequences({"my_sequence": [task_config_1]})
  355. return scenario_config