# Copyright 2021-2025 Avaiga Private Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. from unittest import mock import pytest from taipy import Scope from taipy.common.config import Config from taipy.core.common._utils import _Subscriber from taipy.core.data._data_manager_factory import _DataManagerFactory from taipy.core.data.data_node import DataNode from taipy.core.data.in_memory import InMemoryDataNode from taipy.core.data.pickle import PickleDataNode from taipy.core.exceptions import AttributeKeyAlreadyExisted from taipy.core.scenario._scenario_manager import _ScenarioManager from taipy.core.scenario.scenario import Scenario from taipy.core.sequence._sequence_manager import _SequenceManager from taipy.core.sequence.sequence import Sequence from taipy.core.sequence.sequence_id import SequenceId from taipy.core.task._task_manager import _TaskManager from taipy.core.task.task import Task, TaskId class FakeDataNode: config_id = "config_id_of_a_fake_dn" def test_sequence_equals(): task_config = Config.configure_task("mult_by_3", print, [], None) scenario_config = Config.configure_scenario("scenario", [task_config]) scenario = _ScenarioManager._create(scenario_config) scenario.add_sequences({"print": list(scenario.tasks.values())}) sequence_1 = scenario.sequences["print"] sequence_id = sequence_1.id assert sequence_1.properties["name"] == "print" sequence_2 = _SequenceManager._get(sequence_id) # To test if instance is same type task = Task("task", {}, print, [], [], sequence_id) assert sequence_1 == sequence_2 assert sequence_1 != sequence_id assert sequence_1 != task def test_create_sequence(): input = InMemoryDataNode("foo", Scope.SCENARIO) output = InMemoryDataNode("bar", Scope.SCENARIO) task = Task("baz", {}, print, [input], [output], TaskId("task_id")) sequence = Sequence({"description": "description"}, [task], sequence_id=SequenceId("name_1")) assert sequence.id == "name_1" assert sequence.owner_id is None assert sequence.properties["description"] == "description" assert sequence.foo == input assert sequence.bar == output assert sequence.baz.id == task.id assert sequence.tasks == {task.config_id: task} assert sequence.data_nodes == {"foo": input, "bar": output} assert sequence.parent_ids == set() with pytest.raises(AttributeError): _ = sequence.qux assert sequence.get_label() == sequence.id assert sequence.get_simple_label() == sequence.id input_1 = InMemoryDataNode("input", Scope.SCENARIO) output_1 = InMemoryDataNode("output", Scope.SCENARIO) task_1 = Task("task_1", {}, print, [input_1], [output_1], TaskId("task_id_1")) sequence_1 = Sequence( {"description": "description"}, [task_1], owner_id="owner_id", parent_ids={"scenario_id"}, sequence_id=SequenceId("name_1"), ) assert sequence_1.id == "name_1" assert sequence_1.owner_id == "owner_id" assert sequence_1.properties["description"] == "description" assert sequence_1.input == input_1 assert sequence_1.output == output_1 assert sequence_1.task_1 == task_1 assert sequence_1.tasks == {task_1.config_id: task_1} assert sequence_1.data_nodes == {"input": input_1, "output": output_1} assert sequence_1.parent_ids == {"scenario_id"} assert sequence_1.id is not None with mock.patch("taipy.core.get") as get_mck: class MockOwner: label = "owner_label" def get_label(self): return self.label get_mck.return_value = MockOwner() assert sequence_1.get_label() == "owner_label > " + sequence_1.id assert sequence_1.get_simple_label() == sequence_1.id sequence_2 = Sequence( {"description": "description", "name": "Name"}, [task, task_1], owner_id="owner_id", parent_ids={"parent_id_1", "parent_id_2"}, sequence_id=SequenceId("name_2"), ) assert sequence_2.owner_id == "owner_id" assert sequence_2.id == "name_2" assert sequence_2.properties["description"] == "description" assert sequence_2.tasks == {task.config_id: task, task_1.config_id: task_1} assert sequence_2.data_nodes == {"foo": input, "bar": output, "input": input_1, "output": output_1} assert sequence_2.parent_ids == {"parent_id_1", "parent_id_2"} with mock.patch("taipy.core.get") as get_mck: class MockOwner: label = "owner_label" def get_label(self): return self.label get_mck.return_value = MockOwner() assert sequence_2.get_label() == "owner_label > " + sequence_2.properties["name"] assert sequence_2.get_simple_label() == sequence_2.properties["name"] def test_get_set_attribute(): dn_cfg = Config.configure_data_node("bar") task_config = Config.configure_task("print", print, [dn_cfg], None) scenario_config = Config.configure_scenario("scenario", [task_config]) scenario = _ScenarioManager._create(scenario_config) scenario.add_sequences({"seq": list(scenario.tasks.values())}) sequence = scenario.sequences["seq"] sequence.key = "value" assert sequence.key == "value" with pytest.raises(AttributeKeyAlreadyExisted): sequence.bar = "KeyAlreadyUsed" def test_check_consistency(caplog): sequence_1 = Sequence({}, [], "name_1") assert sequence_1._is_consistent() input_2 = InMemoryDataNode("foo", Scope.SCENARIO) output_2 = InMemoryDataNode("bar", Scope.SCENARIO) task_2 = Task("tfoo", {}, print, [input_2], [output_2], TaskId("task_id_2")) sequence_2 = Sequence({}, [task_2], "name_2") assert sequence_2._is_consistent() data_node_3 = InMemoryDataNode("foo", Scope.SCENARIO) task_3 = Task("tfoo", {}, print, [data_node_3], [data_node_3], TaskId("task_id_3")) sequence_3 = Sequence({}, [task_3], "name_3") assert not sequence_3._is_consistent() # Not a dag assert 'The DAG of sequence "name_3" is not a directed acyclic graph' in caplog.text input_4 = InMemoryDataNode("foo", Scope.SCENARIO) output_4 = InMemoryDataNode("bar", Scope.SCENARIO) task_4_1 = Task("tfoo", {}, print, [input_4], [output_4], TaskId("task_id_4_1")) task_4_2 = Task("tbar", {}, print, [output_4], [input_4], TaskId("task_id_4_2")) sequence_4 = Sequence({}, [task_4_1, task_4_2], "name_4") assert not sequence_4._is_consistent() # Not a Dag assert 'The DAG of sequence "name_4" is not a directed acyclic graph' in caplog.text input_6 = DataNode("foo", Scope.SCENARIO, "input_id_5") output_6 = DataNode("bar", Scope.SCENARIO, "output_id_5") task_6_1 = Task("tfoo", {}, print, [input_6], [output_6], TaskId("task_id_5_1")) task_6_2 = Task("tbar", {}, print, [output_6], [FakeDataNode()], TaskId("task_id_5_2")) sequence_6 = Sequence({}, [task_6_1, task_6_2], "name_5") assert not sequence_6._is_consistent() # Not a DataNode assert ( 'Invalid edge detected in sequence "name_5": left node Task "tbar" and right node FakeDataNode ' "must connect a Task and a DataNode" in caplog.text ) intermediate_7 = DataNode("foo", Scope.SCENARIO, "intermediate_id_7") output_7 = DataNode("bar", Scope.SCENARIO, "output_id_7") task_7_1 = Task("tfoo", {}, print, [], [intermediate_7], TaskId("task_id_7_1")) task_7_2 = Task("tbar", {}, print, [intermediate_7], [output_7], TaskId("task_id_7_2")) sequence_7 = Sequence({}, [task_7_1, task_7_2], "name_7") assert sequence_7._is_consistent() input_8 = DataNode("foo", Scope.SCENARIO, "output_id_8") intermediate_8 = DataNode("bar", Scope.SCENARIO, "intermediate_id_8") task_8_1 = Task("tfoo", {}, print, [input_8], [intermediate_8], TaskId("task_id_8_1")) task_8_2 = Task("tbar", {}, print, [intermediate_8], [], TaskId("task_id_8_2")) sequence_8 = Sequence({}, [task_8_1, task_8_2], "name_8") assert sequence_8._is_consistent() input_9_1 = DataNode("foo", Scope.SCENARIO, "input_id_9_1") output_9_1 = DataNode("bar", Scope.SCENARIO, "output_id_9_1") input_9_2 = DataNode("baz", Scope.SCENARIO, "input_id_9_2") output_9_2 = DataNode("qux", Scope.SCENARIO, "output_id_9_2") task_9_1 = Task("tfoo", {}, print, [input_9_1], [output_9_1], TaskId("task_id_9_1")) task_9_2 = Task("tbar", {}, print, [input_9_2], [output_9_2], TaskId("task_id_9_2")) sequence_9 = Sequence({}, [task_9_1, task_9_2], "name_9") assert not sequence_9._is_consistent() # Not connected assert 'The DAG of sequence "name_9" is not weakly connected' in caplog.text input_10_1 = DataNode("foo", Scope.SCENARIO, "output_id_10_1") intermediate_10_1 = DataNode("bar", Scope.SCENARIO, "intermediate_id_10_1") intermediate_10_2 = DataNode("baz", Scope.SCENARIO, "intermediate_id_10_2") output_10 = DataNode("qux", Scope.SCENARIO, "output_id_10") post_10 = DataNode("quux", Scope.SCENARIO, "post_id_10") task_10_1 = Task("tfoo", {}, print, [input_10_1], [intermediate_10_1], TaskId("task_id_10_1")) task_10_2 = Task("tbar", {}, print, [], [intermediate_10_2], TaskId("task_id_10_2")) task_10_3 = Task("tbaz", {}, print, [intermediate_10_1, intermediate_10_2], [output_10], TaskId("task_id_10_3")) task_10_4 = Task("tqux", {}, print, [output_10], [post_10], TaskId("task_id_10_4")) task_10_5 = Task("tquux", {}, print, [output_10], [], TaskId("task_id_10_5")) sequence_10 = Sequence({}, [task_10_1, task_10_2, task_10_3, task_10_4, task_10_5], "name_10") assert sequence_10._is_consistent() def test_get_sorted_tasks(): def assert_equal(tasks_a, tasks_b) -> bool: if len(tasks_a) != len(tasks_b): return False for i in range(len(tasks_a)): task_a, task_b = tasks_a[i], tasks_b[i] if isinstance(task_a, list) and isinstance(task_b, list): if not assert_equal(task_a, task_b): return False elif isinstance(task_a, list) or isinstance(task_b, list): return False else: index_task_b = tasks_b.index(task_a) if any(isinstance(task_b, list) for task_b in tasks_b[i : index_task_b + 1]): return False return True data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_3 = DataNode("baz", Scope.SCENARIO, "s3") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task( "grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1"), ) task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- ---> s3 ---> t2 ---> s5 ---- # | | | # |---> t1 ---| -------------------------> t3 ---> s6 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert assert_equal(sequence._get_sorted_tasks(), [[task_1], [task_2, task_4], [task_3]]) data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task( "grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"), ) task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- t2 ---> s5 ------ # | | # |---> t1 ---| -----> t3 ---> s6 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert assert_equal(sequence._get_sorted_tasks(), [[task_2, task_1], [task_4, task_3]]) data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task( "grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"), ) task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- s6 ---> t2 ---> s5 # | | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert assert_equal(sequence._get_sorted_tasks(), [[task_2, task_1], [task_4, task_3]]) data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task( "grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"), ) task_2 = Task("garply", {}, print, output=[data_node_5], id=TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], None, id=TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- t2 ---> s5 # | | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert assert_equal(sequence._get_sorted_tasks(), [[task_2, task_1], [task_4, task_3]]) data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") data_node_8 = DataNode("hugh", Scope.SCENARIO, "s8") task_1 = Task( "grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"), ) task_2 = Task("garply", {}, print, output=[data_node_5], id=TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) task_5 = Task("bob", {}, print, [data_node_8], None, TaskId("t5")) sequence = Sequence({}, [task_5, task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- # | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s4 ---> t4 ---> s7 # t2 ---> s5 # s8 ---> t5 assert assert_equal(sequence._get_sorted_tasks(), [[task_5, task_2, task_1], [task_4, task_3]]) def test_get_inputs(): data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_3 = DataNode("baz", Scope.SCENARIO, "s3") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1")) task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- ---> s3 ---> t2 ---> s5 ---- # | | | # |---> t1 ---| -------------------------> t3 ---> s6 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert sequence.get_inputs() == {data_node_1, data_node_2} assert sequence.get_outputs() == {data_node_6, data_node_7} assert sequence.get_intermediate() == {data_node_3, data_node_4, data_node_5} data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1")) task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- t2 ---> s5 ------ # | | # |---> t1 ---| -----> t3 ---> s6 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert sequence.get_inputs() == {data_node_1, data_node_2} assert sequence.get_outputs() == {data_node_6, data_node_7} assert sequence.get_intermediate() == {data_node_4, data_node_5} data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1")) task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- s6 ---> t2 ---> s5 # | | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s4 ---> t4 ---> s7 assert sequence.get_inputs() == {data_node_1, data_node_2, data_node_6} assert sequence.get_outputs() == {data_node_7} assert sequence.get_intermediate() == {data_node_4, data_node_5} data_node_1 = DataNode("foo", Scope.SCENARIO, "s1") data_node_2 = DataNode("bar", Scope.SCENARIO, "s2") data_node_4 = DataNode("qux", Scope.SCENARIO, "s4") data_node_5 = DataNode("quux", Scope.SCENARIO, "s5") data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6") data_node_7 = DataNode("corge", Scope.SCENARIO, "s7") data_node_8 = DataNode("hugh", Scope.SCENARIO, "s8") task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1")) task_2 = Task("garply", {}, print, output=[data_node_5], id=TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4, data_node_6], [data_node_7], TaskId("t4")) task_5 = Task("bob", {}, print, [data_node_8], None, TaskId("t5")) sequence = Sequence({}, [task_5, task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- # | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s4 ---> t4 ---> s7 # t2 ---> s5 | # s8 ---> t5 s6 --| assert sequence.get_inputs() == {data_node_1, data_node_2, data_node_8, data_node_6} assert sequence.get_outputs() == {data_node_5, data_node_7} assert sequence.get_intermediate() == {data_node_4} def test_is_ready_to_run(): scenario_id = "SCENARIO_scenario_id" task_1_id, task_2_id, task_3_id, task_4_id = ( TaskId("TASK_t1"), TaskId("TASK_t2"), TaskId("TASK_t3"), TaskId("TASK_t4"), ) data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", parent_ids={task_1_id}, properties={"default_data": 1}) data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", parent_ids={task_1_id}, properties={"default_data": 2}) data_node_3 = PickleDataNode( "qux", Scope.SCENARIO, "s3", parent_ids={task_1_id, task_3_id, task_4_id}, properties={"default_data": 4} ) data_node_4 = PickleDataNode( "quux", Scope.SCENARIO, "s4", parent_ids={task_2_id, task_3_id}, properties={"default_data": 5} ) data_node_5 = PickleDataNode("quuz", Scope.SCENARIO, "s5", parent_ids={task_2_id}, properties={"default_data": 6}) data_node_6 = PickleDataNode("corge", Scope.SCENARIO, "s6", parent_ids={task_4_id}, properties={"default_data": 7}) task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3], id=task_1_id) task_2 = Task("garply", {}, print, [data_node_5], [data_node_4], id=task_2_id) task_3 = Task("waldo", {}, print, [data_node_4, data_node_3], id=task_3_id) task_4 = Task("fred", {}, print, [data_node_3], [data_node_6], id=task_4_id) scenario = Scenario("scenario_config", [task_1, task_2, task_3, task_4], {}, scenario_id=scenario_id) data_manager = _DataManagerFactory._build_manager() for dn in [data_node_1, data_node_2, data_node_3, data_node_4, data_node_5, data_node_6]: data_manager._repository._save(dn) for task in [task_1, task_2, task_3, task_4]: _TaskManager._repository._save(task) _ScenarioManager._repository._save(scenario) scenario.add_sequence("sequence", [task_4, task_2, task_1, task_3]) sequence = scenario.sequences["sequence"] # s1 --- s5 ---> t2 ---> s4 # | | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s3 ---> t4 ---> s6 assert sequence.is_ready_to_run() data_node_1.edit_in_progress = True assert not sequence.is_ready_to_run() data_node_2.edit_in_progress = True data_node_5.edit_in_progress = True assert not sequence.is_ready_to_run() data_node_1.edit_in_progress = False data_node_2.edit_in_progress = False data_node_5.edit_in_progress = False assert sequence.is_ready_to_run() def test_data_nodes_being_edited(): data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", properties={"default_data": 1}) data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", properties={"default_data": 2}) data_node_4 = PickleDataNode("qux", Scope.SCENARIO, "s4", properties={"default_data": 4}) data_node_5 = PickleDataNode("quux", Scope.SCENARIO, "s5", properties={"default_data": 5}) data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "s6", properties={"default_data": 6}) data_node_7 = PickleDataNode("corge", Scope.SCENARIO, "s7", properties={"default_data": 7}) task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1")) task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2")) task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3")) task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4")) sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1")) # s1 --- s6 ---> t2 ---> s5 # | | # |---> t1 ---| -----> t3 # | | | # s2 --- ---> s4 ---> t4 ---> s7 data_manager = _DataManagerFactory._build_manager() for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7]: data_manager._repository._save(dn) assert len(sequence.data_nodes_being_edited()) == 0 assert sequence.data_nodes_being_edited() == set() data_node_1.edit_in_progress = True assert len(sequence.data_nodes_being_edited()) == 1 assert sequence.data_nodes_being_edited() == {data_node_1} data_node_2.edit_in_progress = True data_node_6.edit_in_progress = True assert len(sequence.data_nodes_being_edited()) == 3 assert sequence.data_nodes_being_edited() == {data_node_1, data_node_2, data_node_6} data_node_4.edit_in_progress = True data_node_5.edit_in_progress = True assert len(sequence.data_nodes_being_edited()) == 5 assert sequence.data_nodes_being_edited() == {data_node_1, data_node_2, data_node_4, data_node_5, data_node_6} data_node_1.edit_in_progress = False data_node_2.edit_in_progress = False data_node_6.edit_in_progress = False assert len(sequence.data_nodes_being_edited()) == 2 assert sequence.data_nodes_being_edited() == {data_node_4, data_node_5} data_node_4.edit_in_progress = False data_node_5.edit_in_progress = False data_node_7.edit_in_progress = True assert len(sequence.data_nodes_being_edited()) == 1 assert sequence.data_nodes_being_edited() == {data_node_7} data_node_7.edit_in_progress = False assert len(sequence.data_nodes_being_edited()) == 0 assert sequence.data_nodes_being_edited() == set() def test_get_tasks(): task_1 = Task("grault", {}, print, id=TaskId("t1")) task_2 = Task("garply", {}, print, id=TaskId("t2")) task_3 = Task("waldo", {}, print, id=TaskId("t3")) sequence_1 = Sequence({}, [task_1, task_2, task_3], SequenceId("p1")) assert sequence_1.tasks == {"grault": task_1, "garply": task_2, "waldo": task_3} def test_get_set_of_tasks(): task_1 = Task("grault", {}, print, id=TaskId("t1")) task_2 = Task("garply", {}, print, id=TaskId("t2")) task_3 = Task("waldo", {}, print, id=TaskId("t3")) sequence_1 = Sequence({}, [task_1, task_2, task_3], SequenceId("p1")) assert sequence_1._get_set_of_tasks() == {task_1, task_2, task_3} def test_auto_update_and_reload(task): tmp_task = Task("tmp_task_config_id", {}, print, list(task.output.values()), [], TaskId("tmp_task_id")) scenario = Scenario("scenario", [task, tmp_task], {}, sequences={"foo": {}}) _TaskManager._repository._save(task) _TaskManager._repository._save(tmp_task) _ScenarioManager._repository._save(scenario) sequence_1 = scenario.sequences["foo"] sequence_2 = _SequenceManager._get(sequence_1) # auto update & reload on tasks attribute assert len(sequence_1.tasks) == 0 assert len(sequence_2.tasks) == 0 sequence_1.tasks = [tmp_task] assert len(sequence_1.tasks) == 1 assert sequence_1.tasks[tmp_task.config_id].id == tmp_task.id assert len(sequence_2.tasks) == 1 assert sequence_2.tasks[tmp_task.config_id].id == tmp_task.id sequence_2.tasks = [task] assert len(sequence_1.tasks) == 1 assert sequence_1.tasks[task.config_id].id == task.id assert len(sequence_2.tasks) == 1 assert sequence_2.tasks[task.config_id].id == task.id assert sequence_1.owner_id == scenario.id assert sequence_2.owner_id == scenario.id # auto set & reload on subscribers attribute assert len(sequence_1.subscribers) == 0 assert len(sequence_2.subscribers) == 0 sequence_1.subscribers.append(print) assert len(sequence_1.subscribers) == 1 assert len(sequence_2.subscribers) == 1 sequence_2.subscribers.append(print) assert len(sequence_1.subscribers) == 2 assert len(sequence_2.subscribers) == 2 sequence_1.subscribers.clear() assert len(sequence_1.subscribers) == 0 assert len(sequence_2.subscribers) == 0 sequence_1.subscribers.extend([print, map]) assert len(sequence_1.subscribers) == 2 assert len(sequence_2.subscribers) == 2 sequence_1.subscribers.remove(_Subscriber(print, [])) assert len(sequence_1.subscribers) == 1 assert len(sequence_2.subscribers) == 1 sequence_2.subscribers.clear() assert len(sequence_1.subscribers) == 0 assert len(sequence_2.subscribers) == 0 sequence_1.subscribers + print + len assert len(sequence_1.subscribers) == 2 assert len(sequence_2.subscribers) == 2 sequence_1.subscribers = [] assert len(sequence_1.subscribers) == 0 assert len(sequence_2.subscribers) == 0 with sequence_1 as sequence: assert len(sequence.tasks) == 1 assert sequence.tasks[task.config_id].id == task.id assert len(sequence.subscribers) == 0 assert sequence._is_in_context sequence.tasks = [] sequence.subscribers = [print] assert len(sequence.tasks) == 1 assert sequence.tasks[task.config_id].id == task.id assert len(sequence.subscribers) == 0 assert sequence._is_in_context assert len(sequence_1.tasks) == 0 assert len(sequence_1.subscribers) == 1 assert not sequence_1._is_in_context def test_auto_update_and_reload_properties(): scenario = Scenario("scenario", [], {}, sequences={"foo": {}}) _ScenarioManager._repository._save(scenario) sequence_1 = scenario.sequences["foo"] sequence_2 = _SequenceManager._get(sequence_1) # auto set & reload on properties attribute assert sequence_1.properties == {"name": "foo"} assert sequence_2.properties == {"name": "foo"} sequence_1.properties["qux"] = 4 assert sequence_1.properties["qux"] == 4 assert sequence_2.properties["qux"] == 4 sequence_2.properties["qux"] = 5 assert sequence_1.properties["qux"] == 5 assert sequence_2.properties["qux"] == 5 sequence_1.properties["temp_key_1"] = "temp_value_1" sequence_1.properties["temp_key_2"] = "temp_value_2" assert sequence_1.properties == { "qux": 5, "name": "foo", "temp_key_1": "temp_value_1", "temp_key_2": "temp_value_2", } assert sequence_2.properties == { "qux": 5, "name": "foo", "temp_key_1": "temp_value_1", "temp_key_2": "temp_value_2", } sequence_1.properties.pop("temp_key_1") assert "temp_key_1" not in sequence_1.properties.keys() assert "temp_key_1" not in sequence_1.properties.keys() assert sequence_1.properties == { "qux": 5, "name": "foo", "temp_key_2": "temp_value_2", } assert sequence_2.properties == { "qux": 5, "name": "foo", "temp_key_2": "temp_value_2", } sequence_2.properties.pop("temp_key_2") assert sequence_1.properties == {"name": "foo", "qux": 5} assert sequence_2.properties == {"name": "foo", "qux": 5} assert "temp_key_2" not in sequence_1.properties.keys() assert "temp_key_2" not in sequence_2.properties.keys() sequence_1.properties["temp_key_3"] = 0 assert sequence_1.properties == {"name": "foo", "qux": 5, "temp_key_3": 0} assert sequence_2.properties == {"name": "foo", "qux": 5, "temp_key_3": 0} sequence_1.properties.update({"temp_key_3": 1}) assert sequence_1.properties == {"name": "foo", "qux": 5, "temp_key_3": 1} assert sequence_2.properties == {"name": "foo", "qux": 5, "temp_key_3": 1} sequence_1.properties.update({}) assert sequence_1.properties == {"name": "foo", "qux": 5, "temp_key_3": 1} assert sequence_2.properties == {"name": "foo", "qux": 5, "temp_key_3": 1} sequence_1.properties["temp_key_4"] = 0 sequence_1.properties["temp_key_5"] = 0 with sequence_1 as sequence: assert sequence._is_in_context assert sequence.properties["qux"] == 5 assert sequence.properties["temp_key_3"] == 1 assert sequence.properties["temp_key_4"] == 0 assert sequence.properties["temp_key_5"] == 0 sequence.properties["qux"] = 9 sequence.properties.pop("temp_key_3") sequence.properties.pop("temp_key_4") sequence.properties.update({"temp_key_4": 1}) sequence.properties.update({"temp_key_5": 2}) sequence.properties.pop("temp_key_5") sequence.properties.update({}) assert sequence._is_in_context assert sequence.properties["qux"] == 5 assert sequence.properties["temp_key_3"] == 1 assert sequence.properties["temp_key_4"] == 0 assert sequence.properties["temp_key_5"] == 0 assert not sequence_1._is_in_context assert sequence_1.properties["qux"] == 9 assert "temp_key_3" not in sequence_1.properties.keys() assert sequence_1.properties["temp_key_4"] == 1 assert "temp_key_5" not in sequence_1.properties.keys() def test_get_parents(sequence): with mock.patch("taipy.core.get_parents") as mck: sequence.get_parents() mck.assert_called_once_with(sequence) def test_subscribe_sequence(): with mock.patch("taipy.core.subscribe_sequence") as mck: sequence = Sequence({}, [], "id") sequence.subscribe(None) mck.assert_called_once_with(None, None, sequence) def test_unsubscribe_sequence(): with mock.patch("taipy.core.unsubscribe_sequence") as mck: sequence = Sequence({}, [], "id") sequence.unsubscribe(None) mck.assert_called_once_with(None, None, sequence) def test_submit_sequence(): with mock.patch("taipy.core.sequence._sequence_manager._SequenceManager._submit") as mck: sequence = Sequence({}, [], "id") sequence.submit(None, False) mck.assert_called_once_with(sequence, None, False, False, None)