test_sequence_manager.py 42 KB


  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. from datetime import datetime
  13. from pathlib import Path
  14. from typing import Callable, Iterable, Optional
  15. from unittest import mock
  16. from unittest.mock import ANY
  17. import pytest
  18. from taipy.config.common.scope import Scope
  19. from taipy.config.config import Config
  20. from taipy.core._orchestrator._orchestrator import _Orchestrator
  21. from taipy.core._version._version_manager import _VersionManager
  22. from taipy.core.common import _utils
  23. from taipy.core.common._utils import _Subscriber
  24. from taipy.core.data._data_manager import _DataManager
  25. from taipy.core.data.in_memory import InMemoryDataNode
  26. from taipy.core.data.pickle import PickleDataNode
  27. from taipy.core.exceptions.exceptions import (
  28. InvalidSequenceId,
  29. ModelNotFound,
  30. NonExistingSequence,
  31. SequenceAlreadyExists,
  32. SequenceBelongsToNonExistingScenario,
  33. )
  34. from taipy.core.job._job_manager import _JobManager
  35. from taipy.core.notification._ready_to_run_cache import _ReadyToRunCache
  36. from taipy.core.scenario._scenario_manager import _ScenarioManager
  37. from taipy.core.scenario.scenario import Scenario
  38. from taipy.core.sequence._sequence_manager import _SequenceManager
  39. from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
  40. from taipy.core.sequence.sequence import Sequence
  41. from taipy.core.sequence.sequence_id import SequenceId
  42. from taipy.core.task._task_manager import _TaskManager
  43. from taipy.core.task.task import Task
  44. from taipy.core.task.task_id import TaskId
  45. from tests.core.utils.NotifyMock import NotifyMock
  46. def test_breakdown_sequence_id():
  47. with pytest.raises(InvalidSequenceId):
  48. _SequenceManager._breakdown_sequence_id("scenario_id")
  49. with pytest.raises(InvalidSequenceId):
  50. _SequenceManager._breakdown_sequence_id("sequence_id")
  51. with pytest.raises(InvalidSequenceId):
  52. _SequenceManager._breakdown_sequence_id("SEQUENCE_sequence_id")
  53. with pytest.raises(InvalidSequenceId):
  54. _SequenceManager._breakdown_sequence_id("SCENARIO_scenario_id")
  55. with pytest.raises(InvalidSequenceId):
  56. _SequenceManager._breakdown_sequence_id("sequence_SCENARIO_scenario_id")
  57. with pytest.raises(InvalidSequenceId):
  58. _SequenceManager._breakdown_sequence_id("SEQUENCE_sequence_scenario_id")
  59. sequence_name, scenario_id = _SequenceManager._breakdown_sequence_id("SEQUENCE_sequence_SCENARIO_scenario")
  60. assert sequence_name == "sequence" and scenario_id == "SCENARIO_scenario"
  61. sequence_name, scenario_id = _SequenceManager._breakdown_sequence_id("SEQUENCEsequenceSCENARIO_scenario")
  62. assert sequence_name == "sequence" and scenario_id == "SCENARIO_scenario"
  63. def test_raise_sequence_does_not_belong_to_scenario():
  64. with pytest.raises(SequenceBelongsToNonExistingScenario):
  65. sequence = Sequence({"name": "sequence_name"}, [], "SEQUENCE_sequence_name_SCENARIO_scenario_id")
  66. _SequenceManager._set(sequence)
  67. def __init():
  68. input_dn = InMemoryDataNode("foo", Scope.SCENARIO)
  69. output_dn = InMemoryDataNode("foo", Scope.SCENARIO)
  70. task = Task("task", {}, print, [input_dn], [output_dn], TaskId("task_id"))
  71. scenario = Scenario("scenario", {task}, {}, set())
  72. _ScenarioManager._set(scenario)
  73. return scenario, task
  74. def test_set_and_get_sequence_no_existing_sequence():
  75. scenario, task = __init()
  76. sequence_name_1 = "p1"
  77. sequence_id_1 = SequenceId(f"SEQUENCE_{sequence_name_1}_{scenario.id}")
  78. sequence_name_2 = "p2"
  79. sequence_id_2 = SequenceId(f"SEQUENCE_{sequence_name_2}_{scenario.id}")
  80. assert _SequenceManager._get(sequence_id_1) is None
  81. assert _SequenceManager._get(sequence_id_2) is None
  82. assert _SequenceManager._get("sequence") is None
  83. def test_set_and_get():
  84. scenario, task = __init()
  85. sequence_name_1 = "p1"
  86. sequence_id_1 = SequenceId(f"SEQUENCE_{sequence_name_1}_{scenario.id}")
  87. sequence_name_2 = "p2"
  88. sequence_id_2 = SequenceId(f"SEQUENCE_{sequence_name_2}_{scenario.id}")
  89. scenario.add_sequences({sequence_name_1: []})
  90. sequence_1 = scenario.sequences[sequence_name_1]
  91. assert _SequenceManager._get(sequence_id_1).id == sequence_1.id
  92. assert len(_SequenceManager._get(sequence_id_1).tasks) == 0
  93. assert _SequenceManager._get(sequence_1).id == sequence_1.id
  94. assert len(_SequenceManager._get(sequence_1).tasks) == 0
  95. assert _SequenceManager._get(sequence_id_2) is None
  96. # Save a second sequence. Now, we expect to have a total of two sequences stored
  97. _TaskManager._set(task)
  98. scenario.add_sequences({sequence_name_2: [task]})
  99. sequence_2 = scenario.sequences[sequence_name_2]
  100. assert _SequenceManager._get(sequence_id_1).id == sequence_1.id
  101. assert len(_SequenceManager._get(sequence_id_1).tasks) == 0
  102. assert _SequenceManager._get(sequence_1).id == sequence_1.id
  103. assert len(_SequenceManager._get(sequence_1).tasks) == 0
  104. assert _SequenceManager._get(sequence_id_2).id == sequence_2.id
  105. assert len(_SequenceManager._get(sequence_id_2).tasks) == 1
  106. assert _SequenceManager._get(sequence_2).id == sequence_2.id
  107. assert len(_SequenceManager._get(sequence_2).tasks) == 1
  108. assert _TaskManager._get(task.id).id == task.id
  109. # We save the first sequence again. We expect an exception and nothing to change
  110. with pytest.raises(SequenceAlreadyExists):
  111. scenario.add_sequence(sequence_name_1, [])
  112. sequence_1 = scenario.sequences[sequence_name_1]
  113. assert _SequenceManager._get(sequence_id_1).id == sequence_1.id
  114. assert len(_SequenceManager._get(sequence_id_1).tasks) == 0
  115. assert _SequenceManager._get(sequence_1).id == sequence_1.id
  116. assert len(_SequenceManager._get(sequence_1).tasks) == 0
  117. assert _SequenceManager._get(sequence_id_2).id == sequence_2.id
  118. assert len(_SequenceManager._get(sequence_id_2).tasks) == 1
  119. assert _SequenceManager._get(sequence_2).id == sequence_2.id
  120. assert len(_SequenceManager._get(sequence_2).tasks) == 1
  121. assert _TaskManager._get(task.id).id == task.id
  122. def test_get_all_on_multiple_versions_environment():
  123. # Create 5 sequences from Scenario with 2 versions each
  124. for version in range(1, 3):
  125. for i in range(5):
  126. _ScenarioManager._set(
  127. Scenario(
  128. f"config_id_{i+version}",
  129. [],
  130. {},
  131. [],
  132. f"SCENARIO_id_{i}_v{version}",
  133. version=f"{version}.0",
  134. sequences={"sequence": {}},
  135. )
  136. )
  137. _VersionManager._set_experiment_version("1.0")
  138. assert len(_SequenceManager._get_all()) == 5
  139. assert (
  140. len(_SequenceManager._get_all_by(filters=[{"version": "1.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 1
  141. )
  142. assert (
  143. len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 0
  144. )
  145. _VersionManager._set_experiment_version("2.0")
  146. assert len(_SequenceManager._get_all()) == 5
  147. assert (
  148. len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 0
  149. )
  150. assert (
  151. len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v2"}])) == 1
  152. )
  153. _VersionManager._set_development_version("1.0")
  154. assert len(_SequenceManager._get_all()) == 5
  155. assert (
  156. len(_SequenceManager._get_all_by(filters=[{"version": "1.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 1
  157. )
  158. assert (
  159. len(_SequenceManager._get_all_by(filters=[{"version": "1.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v2"}])) == 0
  160. )
  161. _VersionManager._set_development_version("2.0")
  162. assert len(_SequenceManager._get_all()) == 5
  163. assert (
  164. len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 0
  165. )
  166. assert (
  167. len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v2"}])) == 1
  168. )
  169. def test_is_submittable():
  170. task_id = "TASK_task_id"
  171. scenario_id = "SCENARIO_scenario_id"
  172. dn_1 = PickleDataNode("dn_1", Scope.SCENARIO, parent_ids={task_id, scenario_id}, properties={"default_data": 10})
  173. dn_2 = PickleDataNode("dn_2", Scope.SCENARIO, parent_ids={task_id, scenario_id}, properties={"default_data": 10})
  174. task = Task("task", {}, print, [dn_1, dn_2], id=task_id, parent_ids={scenario_id})
  175. scenario = Scenario("scenario", {task}, {}, set(), scenario_id=scenario_id)
  176. _DataManager._set(dn_1)
  177. _DataManager._set(dn_2)
  178. _TaskManager._set(task)
  179. _ScenarioManager._set(scenario)
  180. dn_1 = scenario.dn_1
  181. dn_2 = scenario.dn_2
  182. scenario.add_sequences({"sequence": [task]})
  183. sequence = scenario.sequences["sequence"]
  184. assert len(_SequenceManager._get_all()) == 1
  185. assert sequence.id not in _ReadyToRunCache._submittable_id_datanodes
  186. assert scenario.id not in _ReadyToRunCache._submittable_id_datanodes
  187. assert _SequenceManager._is_submittable(sequence)
  188. assert _SequenceManager._is_submittable(sequence.id)
  189. assert _ScenarioManager._is_submittable(scenario)
  190. assert not _SequenceManager._is_submittable("Sequence_temp")
  191. assert not _SequenceManager._is_submittable("SEQUENCE_temp_SCENARIO_scenario")
  192. dn_1.edit_in_progress = True
  193. assert scenario.id in _ReadyToRunCache._submittable_id_datanodes
  194. assert sequence.id in _ReadyToRunCache._submittable_id_datanodes
  195. assert dn_1.id in _ReadyToRunCache._submittable_id_datanodes[scenario.id]
  196. assert dn_1.id in _ReadyToRunCache._submittable_id_datanodes[sequence.id]
  197. assert dn_1.id in _ReadyToRunCache._datanode_id_submittables
  198. assert scenario.id in _ReadyToRunCache._datanode_id_submittables[dn_1.id]
  199. assert sequence.id in _ReadyToRunCache._datanode_id_submittables[dn_1.id]
  200. assert _ReadyToRunCache._submittable_id_datanodes[scenario.id][dn_1.id] == f"DataNode {dn_1.id} is being edited"
  201. assert _ReadyToRunCache._submittable_id_datanodes[sequence.id][dn_1.id] == f"DataNode {dn_1.id} is being edited"
  202. assert not _ScenarioManager._is_submittable(scenario)
  203. assert not _SequenceManager._is_submittable(sequence)
  204. assert not _SequenceManager._is_submittable(sequence.id)
  205. dn_1.edit_in_progress = False
  206. assert scenario.id not in _ReadyToRunCache._submittable_id_datanodes
  207. assert sequence.id not in _ReadyToRunCache._submittable_id_datanodes
  208. assert dn_1.id not in _ReadyToRunCache._datanode_id_submittables
  209. assert _SequenceManager._is_submittable(sequence)
  210. assert _SequenceManager._is_submittable(sequence.id)
  211. assert _ScenarioManager._is_submittable(scenario)
  212. dn_1.last_edit_date = None
  213. dn_2.edit_in_progress = True
  214. assert scenario.id in _ReadyToRunCache._submittable_id_datanodes
  215. assert sequence.id in _ReadyToRunCache._submittable_id_datanodes
  216. assert dn_1.id in _ReadyToRunCache._submittable_id_datanodes[scenario.id]
  217. assert dn_1.id in _ReadyToRunCache._submittable_id_datanodes[sequence.id]
  218. assert dn_2.id in _ReadyToRunCache._submittable_id_datanodes[scenario.id]
  219. assert dn_2.id in _ReadyToRunCache._submittable_id_datanodes[sequence.id]
  220. assert dn_1.id in _ReadyToRunCache._datanode_id_submittables
  221. assert scenario.id in _ReadyToRunCache._datanode_id_submittables[dn_1.id]
  222. assert sequence.id in _ReadyToRunCache._datanode_id_submittables[dn_1.id]
  223. assert dn_2.id in _ReadyToRunCache._datanode_id_submittables
  224. assert scenario.id in _ReadyToRunCache._datanode_id_submittables[dn_2.id]
  225. assert sequence.id in _ReadyToRunCache._datanode_id_submittables[dn_2.id]
  226. assert _ReadyToRunCache._submittable_id_datanodes[scenario.id][dn_1.id] == f"DataNode {dn_1.id} is not written"
  227. assert _ReadyToRunCache._submittable_id_datanodes[sequence.id][dn_1.id] == f"DataNode {dn_1.id} is not written"
  228. assert _ReadyToRunCache._submittable_id_datanodes[scenario.id][dn_2.id] == f"DataNode {dn_2.id} is being edited"
  229. assert _ReadyToRunCache._submittable_id_datanodes[sequence.id][dn_2.id] == f"DataNode {dn_2.id} is being edited"
  230. assert not _ScenarioManager._is_submittable(scenario)
  231. assert not _SequenceManager._is_submittable(sequence)
  232. assert not _SequenceManager._is_submittable(sequence.id)
  233. dn_1.last_edit_date = datetime.now()
  234. assert scenario.id in _ReadyToRunCache._submittable_id_datanodes
  235. assert sequence.id in _ReadyToRunCache._submittable_id_datanodes
  236. assert dn_1.id not in _ReadyToRunCache._submittable_id_datanodes[scenario.id]
  237. assert dn_1.id not in _ReadyToRunCache._submittable_id_datanodes[sequence.id]
  238. assert dn_2.id in _ReadyToRunCache._submittable_id_datanodes[scenario.id]
  239. assert dn_2.id in _ReadyToRunCache._submittable_id_datanodes[sequence.id]
  240. assert dn_1.id not in _ReadyToRunCache._datanode_id_submittables
  241. assert dn_2.id in _ReadyToRunCache._datanode_id_submittables
  242. assert scenario.id in _ReadyToRunCache._datanode_id_submittables[dn_2.id]
  243. assert sequence.id in _ReadyToRunCache._datanode_id_submittables[dn_2.id]
  244. assert _ReadyToRunCache._submittable_id_datanodes[scenario.id][dn_2.id] == f"DataNode {dn_2.id} is being edited"
  245. assert _ReadyToRunCache._submittable_id_datanodes[sequence.id][dn_2.id] == f"DataNode {dn_2.id} is being edited"
  246. assert not _ScenarioManager._is_submittable(scenario)
  247. assert not _SequenceManager._is_submittable(sequence)
  248. assert not _SequenceManager._is_submittable(sequence.id)
  249. dn_2.edit_in_progress = False
  250. assert scenario.id not in _ReadyToRunCache._submittable_id_datanodes
  251. assert sequence.id not in _ReadyToRunCache._submittable_id_datanodes
  252. assert dn_2.id not in _ReadyToRunCache._submittable_id_datanodes[scenario.id]
  253. assert dn_2.id not in _ReadyToRunCache._submittable_id_datanodes[sequence.id]
  254. assert dn_2.id not in _ReadyToRunCache._datanode_id_submittables
  255. assert _ScenarioManager._is_submittable(scenario)
  256. assert _SequenceManager._is_submittable(sequence)
  257. assert _SequenceManager._is_submittable(sequence.id)
  258. def test_submit():
  259. data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
  260. data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
  261. data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3")
  262. data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
  263. data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
  264. data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
  265. data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
  266. task_1 = Task(
  267. "grault",
  268. {},
  269. print,
  270. [data_node_1, data_node_2],
  271. [data_node_3, data_node_4],
  272. TaskId("t1"),
  273. )
  274. task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
  275. task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
  276. task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
  277. scenario = Scenario("sce", {task_1, task_2, task_3, task_4}, {})
  278. sequence_name = "sequence"
  279. sequence_id = Sequence._new_id(sequence_name, scenario.id)
  280. class MockOrchestrator(_Orchestrator):
  281. submit_calls = []
  282. @classmethod
  283. def _lock_dn_output_and_create_job(
  284. cls,
  285. task: Task,
  286. submit_id: str,
  287. submit_entity_id: str,
  288. callbacks: Optional[Iterable[Callable]] = None,
  289. force: bool = False,
  290. ):
  291. cls.submit_calls.append(task)
  292. return super()._lock_dn_output_and_create_job(task, submit_id, submit_entity_id, callbacks, force)
  293. with mock.patch("taipy.core.task._task_manager._TaskManager._orchestrator", new=MockOrchestrator):
  294. # sequence does not exist. We expect an exception to be raised
  295. with pytest.raises(NonExistingSequence):
  296. _SequenceManager._submit(sequence_id)
  297. _ScenarioManager._set(scenario)
  298. scenario.add_sequences({sequence_name: [task_4, task_2, task_1, task_3]})
  299. # sequence, and tasks does exist. We expect the tasks to be submitted
  300. # in a specific order
  301. _TaskManager._set(task_1)
  302. _TaskManager._set(task_2)
  303. _TaskManager._set(task_3)
  304. _TaskManager._set(task_4)
  305. sequence = scenario.sequences[sequence_name]
  306. _SequenceManager._submit(sequence.id)
  307. calls_ids = [t.id for t in _TaskManager._orchestrator().submit_calls]
  308. tasks_ids = [task_1.id, task_2.id, task_4.id, task_3.id]
  309. assert calls_ids == tasks_ids
  310. _SequenceManager._submit(sequence)
  311. calls_ids = [t.id for t in _TaskManager._orchestrator().submit_calls]
  312. tasks_ids = tasks_ids * 2
  313. assert set(calls_ids) == set(tasks_ids)
  314. def test_assign_sequence_as_parent_of_task():
  315. dn_config_1 = Config.configure_data_node("dn_1", "in_memory", scope=Scope.SCENARIO)
  316. dn_config_2 = Config.configure_data_node("dn_2", "in_memory", scope=Scope.SCENARIO)
  317. dn_config_3 = Config.configure_data_node("dn_3", "in_memory", scope=Scope.SCENARIO)
  318. task_config_1 = Config.configure_task("task_1", print, [dn_config_1], [dn_config_2])
  319. task_config_2 = Config.configure_task("task_2", print, [dn_config_2], [dn_config_3])
  320. task_config_3 = Config.configure_task("task_3", print, [dn_config_2], [dn_config_3])
  321. tasks = _TaskManager._bulk_get_or_create([task_config_1, task_config_2, task_config_3], "scenario_id")
  322. sequence_1 = _SequenceManager._create("sequence_1", [tasks[0], tasks[1]], scenario_id="scenario_id")
  323. sequence_2 = _SequenceManager._create("sequence_2", [tasks[0], tasks[2]], scenario_id="scenario_id")
  324. tasks_1 = list(sequence_1.tasks.values())
  325. tasks_2 = list(sequence_2.tasks.values())
  326. assert len(tasks_1) == 2
  327. assert len(tasks_2) == 2
  328. assert tasks_1[0].parent_ids == {sequence_1.id, sequence_2.id}
  329. assert tasks_2[0].parent_ids == {sequence_1.id, sequence_2.id}
  330. assert tasks_1[1].parent_ids == {sequence_1.id}
  331. assert tasks_2[1].parent_ids == {sequence_2.id}
  332. g = 0
  333. def mock_function_no_input_no_output():
  334. global g
  335. g += 1
  336. def mock_function_one_input_no_output(inp):
  337. global g
  338. g += inp
  339. def mock_function_no_input_one_output():
  340. global g
  341. return g
  342. def test_submit_sequence_from_tasks_with_one_or_no_input_output():
  343. # test no input and no output Task
  344. task_no_input_no_output = Task("task_no_input_no_output", {}, mock_function_no_input_no_output)
  345. scenario_1 = Scenario("scenario_1", {task_no_input_no_output}, {})
  346. _TaskManager._set(task_no_input_no_output)
  347. _ScenarioManager._set(scenario_1)
  348. scenario_1.add_sequences({"my_sequence_1": [task_no_input_no_output]})
  349. sequence_1 = scenario_1.sequences["my_sequence_1"]
  350. assert len(sequence_1._get_sorted_tasks()) == 1
  351. _SequenceManager._submit(sequence_1)
  352. assert g == 1
  353. # test one input and no output Task
  354. data_node_input = InMemoryDataNode("input_dn", Scope.SCENARIO, properties={"default_data": 2})
  355. task_one_input_no_output = Task(
  356. "task_one_input_no_output", {}, mock_function_one_input_no_output, input=[data_node_input]
  357. )
  358. scenario_2 = Scenario("scenario_2", {task_one_input_no_output}, {})
  359. _DataManager._set(data_node_input)
  360. data_node_input.unlock_edit()
  361. _TaskManager._set(task_one_input_no_output)
  362. _ScenarioManager._set(scenario_2)
  363. scenario_2.add_sequences({"my_sequence_2": [task_one_input_no_output]})
  364. sequence_2 = scenario_2.sequences["my_sequence_2"]
  365. assert len(sequence_2._get_sorted_tasks()) == 1
  366. _SequenceManager._submit(sequence_2)
  367. assert g == 3
  368. # test no input and one output Task
  369. data_node_output = InMemoryDataNode("output_dn", Scope.SCENARIO, properties={"default_data": None})
  370. task_no_input_one_output = Task(
  371. "task_no_input_one_output", {}, mock_function_no_input_one_output, output=[data_node_output]
  372. )
  373. scenario_3 = Scenario("scenario_3", {task_no_input_one_output}, {})
  374. _DataManager._set(data_node_output)
  375. assert data_node_output.read() is None
  376. _TaskManager._set(task_no_input_one_output)
  377. _ScenarioManager._set(scenario_3)
  378. scenario_3.add_sequences({"my_sequence_3": [task_no_input_one_output]})
  379. sequence_3 = scenario_3.sequences["my_sequence_3"]
  380. assert len(sequence_2._get_sorted_tasks()) == 1
  381. _SequenceManager._submit(sequence_3)
  382. assert data_node_output.read() == 3
  383. def mult_by_two(nb: int):
  384. return nb * 2
  385. def mult_by_3(nb: int):
  386. return nb * 3
  387. def test_get_or_create_data():
  388. # only create intermediate data node once
  389. dn_config_1 = Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)
  390. dn_config_2 = Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0)
  391. dn_config_6 = Config.configure_data_node("baz", "in_memory", Scope.SCENARIO, default_data=0)
  392. task_config_mult_by_two = Config.configure_task("mult_by_two", mult_by_two, [dn_config_1], dn_config_2)
  393. task_config_mult_by_3 = Config.configure_task("mult_by_3", mult_by_3, [dn_config_2], dn_config_6)
  394. # dn_1 ---> mult_by_two ---> dn_2 ---> mult_by_3 ---> dn_6
  395. scenario_config = Config.configure_scenario("scenario", [task_config_mult_by_two, task_config_mult_by_3])
  396. assert len(_DataManager._get_all()) == 0
  397. assert len(_TaskManager._get_all()) == 0
  398. scenario = _ScenarioManager._create(scenario_config)
  399. scenario.add_sequences({"by_6": list(scenario.tasks.values())})
  400. sequence = scenario.sequences["by_6"]
  401. assert sequence.name == "by_6"
  402. assert len(_DataManager._get_all()) == 3
  403. assert len(_TaskManager._get_all()) == 2
  404. assert len(sequence._get_sorted_tasks()) == 2
  405. assert sequence.foo.read() == 1
  406. assert sequence.bar.read() == 0
  407. assert sequence.baz.read() == 0
  408. assert sequence._get_sorted_tasks()[0][0].config_id == task_config_mult_by_two.id
  409. assert sequence._get_sorted_tasks()[1][0].config_id == task_config_mult_by_3.id
  410. _SequenceManager._submit(sequence.id)
  411. assert sequence.foo.read() == 1
  412. assert sequence.bar.read() == 2
  413. assert sequence.baz.read() == 6
  414. sequence.foo.write("new data value")
  415. assert sequence.foo.read() == "new data value"
  416. assert sequence.bar.read() == 2
  417. assert sequence.baz.read() == 6
  418. sequence.bar.write(7)
  419. assert sequence.foo.read() == "new data value"
  420. assert sequence.bar.read() == 7
  421. assert sequence.baz.read() == 6
  422. with pytest.raises(AttributeError):
  423. sequence.WRONG.write(7)
  424. def notify1(*args, **kwargs):
  425. ...
  426. def notify2(*args, **kwargs):
  427. ...
  428. def notify_multi_param(*args, **kwargs):
  429. ...
  430. def test_sequence_notification_subscribe(mocker):
  431. mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
  432. task_configs = [
  433. Config.configure_task(
  434. "mult_by_two",
  435. mult_by_two,
  436. [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
  437. Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
  438. )
  439. ]
  440. tasks = _TaskManager._bulk_get_or_create(task_configs=task_configs)
  441. scenario = Scenario("scenario", set(tasks), {}, sequences={"by_1": {"tasks": tasks}})
  442. _ScenarioManager._set(scenario)
  443. sequence = scenario.sequences["by_1"]
  444. notify_1 = NotifyMock(sequence)
  445. notify_1.__name__ = "notify_1"
  446. notify_1.__module__ = "notify_1"
  447. notify_2 = NotifyMock(sequence)
  448. notify_2.__name__ = "notify_2"
  449. notify_2.__module__ = "notify_2"
  450. # Mocking this because NotifyMock is a class that does not loads correctly when getting the sequence
  451. # from the storage.
  452. mocker.patch.object(
  453. _utils,
  454. "_load_fct",
  455. side_effect=[
  456. notify_1,
  457. notify_1,
  458. notify_1,
  459. notify_1,
  460. notify_2,
  461. notify_2,
  462. notify_2,
  463. notify_2,
  464. notify_2,
  465. notify_2,
  466. ],
  467. )
  468. # test subscription
  469. callback = mock.MagicMock()
  470. _SequenceManager._submit(sequence.id, [callback])
  471. callback.assert_called()
  472. # test sequence subscribe notification
  473. _SequenceManager._subscribe(callback=notify_1, sequence=sequence)
  474. _SequenceManager._submit(sequence.id)
  475. notify_1.assert_called_3_times()
  476. notify_1.reset()
  477. # test sequence unsubscribe notification
  478. # test subscribe notification only on new job
  479. _SequenceManager._unsubscribe(callback=notify_1, sequence=sequence)
  480. _SequenceManager._subscribe(callback=notify_2, sequence=sequence)
  481. _SequenceManager._submit(sequence)
  482. notify_1.assert_not_called()
  483. notify_2.assert_called_3_times()
  484. def test_sequence_notification_subscribe_multi_param(mocker):
  485. mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
  486. task_configs = [
  487. Config.configure_task(
  488. "mult_by_two",
  489. mult_by_two,
  490. [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
  491. Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
  492. )
  493. ]
  494. tasks = _TaskManager._bulk_get_or_create(task_configs)
  495. scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}})
  496. _ScenarioManager._set(scenario)
  497. sequence = scenario.sequences["by_6"]
  498. notify = mocker.Mock()
  499. # test sequence subscribe notification
  500. _SequenceManager._subscribe(callback=notify, params=["foobar", 123, 1.2], sequence=sequence)
  501. mocker.patch.object(_SequenceManager, "_get", return_value=sequence)
  502. _SequenceManager._submit(sequence.id)
  503. # as the callback is called with Sequence/Scenario and Job objects
  504. # we can assert that is called with params plus a sequence object that we know
  505. # of and a job object that is represented by ANY in this case
  506. notify.assert_called_with("foobar", 123, 1.2, sequence, ANY)
  507. def test_sequence_notification_unsubscribe(mocker):
  508. mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
  509. task_configs = [
  510. Config.configure_task(
  511. "mult_by_two",
  512. mult_by_two,
  513. [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
  514. Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
  515. )
  516. ]
  517. tasks = _TaskManager._bulk_get_or_create(task_configs)
  518. scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}})
  519. _ScenarioManager._set(scenario)
  520. sequence = scenario.sequences["by_6"]
  521. notify_1 = notify1
  522. notify_2 = notify2
  523. _SequenceManager._subscribe(callback=notify_1, sequence=sequence)
  524. _SequenceManager._unsubscribe(callback=notify_1, sequence=sequence)
  525. _SequenceManager._subscribe(callback=notify_2, sequence=sequence)
  526. _SequenceManager._submit(sequence.id)
  527. with pytest.raises(ValueError):
  528. _SequenceManager._unsubscribe(callback=notify_1, sequence=sequence)
  529. _SequenceManager._unsubscribe(callback=notify_2, sequence=sequence)
  530. def test_sequence_notification_unsubscribe_multi_param():
  531. task_configs = [
  532. Config.configure_task(
  533. "mult_by_two",
  534. mult_by_two,
  535. [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
  536. Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
  537. )
  538. ]
  539. tasks = _TaskManager._bulk_get_or_create(task_configs)
  540. scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}})
  541. _ScenarioManager._set(scenario)
  542. sequence = scenario.sequences["by_6"]
  543. _SequenceManager._subscribe(callback=notify_multi_param, params=["foobar", 123, 0], sequence=sequence)
  544. _SequenceManager._subscribe(callback=notify_multi_param, params=["foobar", 123, 1], sequence=sequence)
  545. _SequenceManager._subscribe(callback=notify_multi_param, params=["foobar", 123, 2], sequence=sequence)
  546. assert len(sequence.subscribers) == 3
  547. sequence.unsubscribe(notify_multi_param)
  548. assert len(sequence.subscribers) == 2
  549. assert _Subscriber(notify_multi_param, ["foobar", 123, 0]) not in sequence.subscribers
  550. sequence.unsubscribe(notify_multi_param, ["foobar", 123, 2])
  551. assert len(sequence.subscribers) == 1
  552. assert _Subscriber(notify_multi_param, ["foobar", 123, 2]) not in sequence.subscribers
  553. with pytest.raises(ValueError):
  554. sequence.unsubscribe(notify_multi_param, ["foobar", 123, 10000])
  555. def test_sequence_notification_subscribe_all():
  556. task_configs = [
  557. Config.configure_task(
  558. "mult_by_two",
  559. mult_by_two,
  560. [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
  561. Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
  562. )
  563. ]
  564. tasks = _TaskManager._bulk_get_or_create(task_configs)
  565. scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}, "other_sequence": {"tasks": tasks}})
  566. _ScenarioManager._set(scenario)
  567. sequence = scenario.sequences["by_6"]
  568. other_sequence = scenario.sequences["other_sequence"]
  569. notify_1 = NotifyMock(sequence)
  570. _SequenceManager._subscribe(notify_1)
  571. assert len(_SequenceManager._get(sequence.id).subscribers) == 1
  572. assert len(_SequenceManager._get(other_sequence.id).subscribers) == 1
  573. def test_delete():
  574. sequence_id = "SEQUENCE_sequence_SCENARIO_scenario_id_1"
  575. with pytest.raises(ModelNotFound):
  576. _SequenceManager._delete(sequence_id)
  577. scenario_1 = Scenario("scenario_1", set(), {}, scenario_id="SCENARIO_scenario_id_1")
  578. scenario_2 = Scenario("scenario_2", set(), {}, scenario_id="SCENARIO_scenario_id_2")
  579. _ScenarioManager._set(scenario_1)
  580. _ScenarioManager._set(scenario_2)
  581. with pytest.raises(ModelNotFound):
  582. _SequenceManager._delete(SequenceId(sequence_id))
  583. scenario_1.add_sequences({"sequence": []})
  584. assert len(_SequenceManager._get_all()) == 1
  585. _SequenceManager._delete(SequenceId(sequence_id))
  586. assert len(_SequenceManager._get_all()) == 0
  587. scenario_1.add_sequences({"sequence": [], "sequence_1": []})
  588. assert len(_SequenceManager._get_all()) == 2
  589. _SequenceManager._delete(SequenceId(sequence_id))
  590. assert len(_SequenceManager._get_all()) == 1
  591. with pytest.raises(SequenceAlreadyExists):
  592. scenario_1.add_sequences({"sequence_1": [], "sequence_2": [], "sequence_3": []})
  593. scenario_1.add_sequences({"sequence_2": [], "sequence_3": []})
  594. scenario_2.add_sequences({"sequence_1_2": [], "sequence_2_2": []})
  595. assert len(_SequenceManager._get_all()) == 5
  596. _SequenceManager._delete_all()
  597. assert len(_SequenceManager._get_all()) == 0
  598. scenario_1.add_sequences({"sequence_1": [], "sequence_2": [], "sequence_3": [], "sequence_4": []})
  599. scenario_2.add_sequences({"sequence_1_2": [], "sequence_2_2": []})
  600. assert len(_SequenceManager._get_all()) == 6
  601. _SequenceManager._delete_many(
  602. [
  603. "SEQUENCE_sequence_1_SCENARIO_scenario_id_1",
  604. "SEQUENCE_sequence_2_SCENARIO_scenario_id_1",
  605. "SEQUENCE_sequence_1_2_SCENARIO_scenario_id_2",
  606. ]
  607. )
  608. assert len(_SequenceManager._get_all()) == 3
  609. with pytest.raises(ModelNotFound):
  610. _SequenceManager._delete_many(
  611. ["SEQUENCE_sequence_1_SCENARIO_scenario_id_1", "SEQUENCE_sequence_2_SCENARIO_scenario_id_1"]
  612. )
  613. def test_delete_version():
  614. scenario_1_0 = Scenario(
  615. "scenario_config",
  616. [],
  617. {},
  618. scenario_id="SCENARIO_id_1_v1_0",
  619. version="1.0",
  620. sequences={"sequence_1": {}, "sequence_2": {}},
  621. )
  622. scenario_1_1 = Scenario(
  623. "scenario_config",
  624. [],
  625. {},
  626. scenario_id="SCENARIO_id_1_v1_1",
  627. version="1.1",
  628. sequences={"sequence_1": {}, "sequence_2": {}},
  629. )
  630. _ScenarioManager._set(scenario_1_0)
  631. _ScenarioManager._set(scenario_1_1)
  632. _VersionManager._set_experiment_version("1.1")
  633. assert len(_ScenarioManager._get_all()) == 1
  634. assert len(_SequenceManager._get_all()) == 2
  635. _VersionManager._set_experiment_version("1.0")
  636. assert len(_ScenarioManager._get_all()) == 1
  637. assert len(_SequenceManager._get_all()) == 2
  638. _SequenceManager._delete_by_version("1.0")
  639. assert len(_ScenarioManager._get_all()) == 1
  640. assert len(_SequenceManager._get_all()) == 0
  641. assert len(scenario_1_0.sequences) == 0
  642. assert len(scenario_1_1.sequences) == 2
  643. _VersionManager._set_experiment_version("1.1")
  644. assert len(_ScenarioManager._get_all()) == 1
  645. assert len(_SequenceManager._get_all()) == 2
  646. assert len(scenario_1_0.sequences) == 0
  647. assert len(scenario_1_1.sequences) == 2
  648. _SequenceManager._delete_by_version("1.1")
  649. assert len(_ScenarioManager._get_all()) == 1
  650. assert len(_SequenceManager._get_all()) == 0
  651. def test_exists():
  652. scenario = Scenario("scenario", [], {}, scenario_id="SCENARIO_scenario", sequences={"sequence": {}})
  653. _ScenarioManager._set(scenario)
  654. assert len(_ScenarioManager._get_all()) == 1
  655. assert len(_SequenceManager._get_all()) == 1
  656. assert not _SequenceManager._exists("SEQUENCE_sequence_not_exist_SCENARIO_scenario")
  657. assert not _SequenceManager._exists("SEQUENCE_sequence_SCENARIO_scenario_id")
  658. assert _SequenceManager._exists("SEQUENCE_sequence_SCENARIO_scenario")
  659. assert _SequenceManager._exists(scenario.sequences["sequence"])
  660. def test_export(tmpdir_factory):
  661. path = tmpdir_factory.mktemp("data")
  662. task = Task("task", {}, print, id=TaskId("task_id"))
  663. scenario = Scenario(
  664. "scenario",
  665. {task},
  666. {},
  667. set(),
  668. version="1.0",
  669. sequences={"sequence_1": {}, "sequence_2": {"tasks": [task], "properties": {"xyz": "acb"}}},
  670. )
  671. _TaskManager._set(task)
  672. _ScenarioManager._set(scenario)
  673. sequence_1 = scenario.sequences["sequence_1"]
  674. sequence_2 = scenario.sequences["sequence_2"]
  675. _SequenceManager._export(sequence_1.id, Path(path))
  676. export_sequence_json_file_path = f"{path}/sequences/{sequence_1.id}.json"
  677. with open(export_sequence_json_file_path, "rb") as f:
  678. sequence_json_file = json.load(f)
  679. expected_json = {
  680. "id": sequence_1.id,
  681. "owner_id": scenario.id,
  682. "parent_ids": [scenario.id],
  683. "name": "sequence_1",
  684. "tasks": [],
  685. "properties": {},
  686. "subscribers": [],
  687. }
  688. assert expected_json == sequence_json_file
  689. _SequenceManager._export(sequence_2.id, Path(path))
  690. export_sequence_json_file_path = f"{path}/sequences/{sequence_2.id}.json"
  691. with open(export_sequence_json_file_path, "rb") as f:
  692. sequence_json_file = json.load(f)
  693. expected_json = {
  694. "id": sequence_2.id,
  695. "owner_id": scenario.id,
  696. "parent_ids": [scenario.id],
  697. "name": "sequence_2",
  698. "tasks": [task.id],
  699. "properties": {"xyz": "acb"},
  700. "subscribers": [],
  701. }
  702. assert expected_json == sequence_json_file
  703. def test_hard_delete_one_single_sequence_with_scenario_data_nodes():
  704. dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
  705. dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
  706. task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
  707. tasks = _TaskManager._bulk_get_or_create([task_config])
  708. scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
  709. _ScenarioManager._set(scenario)
  710. sequence = scenario.sequences["sequence"]
  711. sequence.submit()
  712. assert len(_ScenarioManager._get_all()) == 1
  713. assert len(_SequenceManager._get_all()) == 1
  714. assert len(_TaskManager._get_all()) == 1
  715. assert len(_DataManager._get_all()) == 2
  716. assert len(_JobManager._get_all()) == 1
  717. _SequenceManager._hard_delete(sequence.id)
  718. assert len(_ScenarioManager._get_all()) == 1
  719. assert len(_SequenceManager._get_all()) == 0
  720. assert len(_TaskManager._get_all()) == 1
  721. assert len(_DataManager._get_all()) == 2
  722. assert len(_JobManager._get_all()) == 1
  723. def test_hard_delete_one_single_sequence_with_cycle_data_nodes():
  724. dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing")
  725. dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE)
  726. task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
  727. tasks = _TaskManager._bulk_get_or_create([task_config])
  728. scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
  729. _ScenarioManager._set(scenario)
  730. sequence = scenario.sequences["sequence"]
  731. sequence.submit()
  732. assert len(_ScenarioManager._get_all()) == 1
  733. assert len(_SequenceManager._get_all()) == 1
  734. assert len(_TaskManager._get_all()) == 1
  735. assert len(_DataManager._get_all()) == 2
  736. assert len(_JobManager._get_all()) == 1
  737. _SequenceManager._hard_delete(sequence.id)
  738. assert len(_ScenarioManager._get_all()) == 1
  739. assert len(_SequenceManager._get_all()) == 0
  740. assert len(_TaskManager._get_all()) == 1
  741. assert len(_DataManager._get_all()) == 2
  742. assert len(_JobManager._get_all()) == 1
  743. def test_hard_delete_shared_entities():
  744. input_dn = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
  745. intermediate_dn = Config.configure_data_node("my_inter", "in_memory", scope=Scope.GLOBAL, default_data="testing")
  746. output_dn = Config.configure_data_node("my_output", "in_memory", scope=Scope.GLOBAL, default_data="testing")
  747. task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn)
  748. task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn)
  749. tasks_scenario_1 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_1")
  750. tasks_scenario_2 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_2")
  751. scenario_1 = Scenario("scenario_1", tasks_scenario_1, {}, sequences={"sequence": {"tasks": tasks_scenario_1}})
  752. scenario_2 = Scenario("scenario_2", tasks_scenario_2, {}, sequences={"sequence": {"tasks": tasks_scenario_2}})
  753. _ScenarioManager._set(scenario_1)
  754. _ScenarioManager._set(scenario_2)
  755. sequence_1 = scenario_1.sequences["sequence"]
  756. sequence_2 = scenario_2.sequences["sequence"]
  757. _SequenceManager._submit(sequence_1.id)
  758. _SequenceManager._submit(sequence_2.id)
  759. assert len(_ScenarioManager._get_all()) == 2
  760. assert len(_SequenceManager._get_all()) == 2
  761. assert len(_TaskManager._get_all()) == 3
  762. assert len(_DataManager._get_all()) == 4
  763. assert len(_JobManager._get_all()) == 4
  764. _SequenceManager._hard_delete(sequence_1.id)
  765. assert len(_ScenarioManager._get_all()) == 2
  766. assert len(_SequenceManager._get_all()) == 1
  767. assert len(_TaskManager._get_all()) == 3
  768. assert len(_DataManager._get_all()) == 4
  769. assert len(_JobManager._get_all()) == 4
  770. def my_print(a, b):
  771. print(a + b) # noqa: T201
  772. def test_submit_task_with_input_dn_wrong_file_path(caplog):
  773. csv_dn_cfg = Config.configure_csv_data_node("wrong_csv_file_path", default_path="wrong_path.csv")
  774. pickle_dn_cfg = Config.configure_pickle_data_node("wrong_pickle_file_path", default_path="wrong_path.pickle")
  775. parquet_dn_cfg = Config.configure_parquet_data_node("wrong_parquet_file_path", default_path="wrong_path.parquet")
  776. json_dn_cfg = Config.configure_parquet_data_node("wrong_json_file_path", default_path="wrong_path.json")
  777. task_cfg = Config.configure_task("task", my_print, [csv_dn_cfg, pickle_dn_cfg], parquet_dn_cfg)
  778. task_2_cfg = Config.configure_task("task2", my_print, [csv_dn_cfg, parquet_dn_cfg], json_dn_cfg)
  779. tasks = _TaskManager._bulk_get_or_create([task_cfg, task_2_cfg])
  780. scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
  781. _ScenarioManager._set(scenario)
  782. sequence = scenario.sequences["sequence"]
  783. pip_manager = _SequenceManagerFactory._build_manager()
  784. pip_manager._submit(sequence)
  785. stdout = caplog.text
  786. expected_outputs = [
  787. f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
  788. f"path : {input_dn.path} "
  789. for input_dn in sequence.get_inputs()
  790. ]
  791. not_expected_outputs = [
  792. f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
  793. f"path : {input_dn.path} "
  794. for input_dn in sequence.data_nodes.values()
  795. if input_dn not in sequence.get_inputs()
  796. ]
  797. assert all(expected_output in stdout for expected_output in expected_outputs)
  798. assert all(expected_output not in stdout for expected_output in not_expected_outputs)
  799. def test_submit_task_with_one_input_dn_wrong_file_path(caplog):
  800. csv_dn_cfg = Config.configure_csv_data_node("wrong_csv_file_path", default_path="wrong_path.csv")
  801. pickle_dn_cfg = Config.configure_pickle_data_node("wrong_pickle_file_path", default_data="value")
  802. parquet_dn_cfg = Config.configure_parquet_data_node("wrong_parquet_file_path", default_path="wrong_path.parquet")
  803. json_dn_cfg = Config.configure_parquet_data_node("wrong_json_file_path", default_path="wrong_path.json")
  804. task_cfg = Config.configure_task("task", my_print, [csv_dn_cfg, pickle_dn_cfg], parquet_dn_cfg)
  805. task_2_cfg = Config.configure_task("task2", my_print, [csv_dn_cfg, parquet_dn_cfg], json_dn_cfg)
  806. tasks = _TaskManager._bulk_get_or_create([task_cfg, task_2_cfg])
  807. scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
  808. _ScenarioManager._set(scenario)
  809. sequence = scenario.sequences["sequence"]
  810. pip_manager = _SequenceManagerFactory._build_manager()
  811. pip_manager._submit(sequence)
  812. stdout = caplog.text
  813. expected_outputs = [
  814. f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
  815. f"path : {input_dn.path} "
  816. for input_dn in sequence.get_inputs()
  817. if input_dn.config_id == "wrong_csv_file_path"
  818. ]
  819. not_expected_outputs = [
  820. f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
  821. f"path : {input_dn.path} "
  822. for input_dn in sequence.data_nodes.values()
  823. if input_dn.config_id != "wrong_csv_file_path"
  824. ]
  825. assert all(expected_output in stdout for expected_output in expected_outputs)
  826. assert all(expected_output not in stdout for expected_output in not_expected_outputs)