1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from datetime import datetime, timedelta
- from unittest import mock
- import pytest
- from taipy import Scope
- from taipy.common.config import Config
- from taipy.common.config.exceptions.exceptions import InvalidConfigurationId
- from taipy.core import create_scenario
- from taipy.core.common._utils import _Subscriber
- from taipy.core.common.frequency import Frequency
- from taipy.core.cycle._cycle_manager_factory import _CycleManagerFactory
- from taipy.core.cycle.cycle import Cycle, CycleId
- from taipy.core.data._data_manager_factory import _DataManagerFactory
- from taipy.core.data.in_memory import DataNode, InMemoryDataNode
- from taipy.core.data.pickle import PickleDataNode
- from taipy.core.exceptions.exceptions import (
- AttributeKeyAlreadyExisted,
- SequenceAlreadyExists,
- SequenceTaskDoesNotExistInScenario,
- )
- from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
- from taipy.core.scenario.scenario import Scenario
- from taipy.core.scenario.scenario_id import ScenarioId
- from taipy.core.sequence.sequence import Sequence
- from taipy.core.sequence.sequence_id import SequenceId
- from taipy.core.task._task_manager_factory import _TaskManagerFactory
- from taipy.core.task.task import Task, TaskId
- def test_scenario_equals(scenario):
- scenario_manager = _ScenarioManagerFactory()._build_manager()
- scenario_id = scenario.id
- scenario_manager._repository._save(scenario)
- # To test if instance is same type
- task = Task("task", {}, print, [], [], scenario_id)
- scenario_2 = scenario_manager._get(scenario_id)
- assert scenario == scenario_2
- assert scenario != scenario_id
- assert scenario != task
- def test_create_primary_scenario(cycle):
- scenario = Scenario("foo", set(), {"key": "value"}, is_primary=True, cycle=cycle)
- assert scenario.id is not None
- assert scenario.config_id == "foo"
- assert scenario.tasks == {}
- assert scenario.additional_data_nodes == {}
- assert scenario.data_nodes == {}
- assert scenario.sequences == {}
- assert scenario.properties == {"key": "value"}
- assert scenario.properties["key"] == "value"
- assert scenario.creation_date is not None
- assert scenario.is_primary
- assert scenario.cycle == cycle
- assert scenario.tags == set()
- assert scenario.get_simple_label() == scenario.config_id
- with mock.patch("taipy.core.get") as get_mck:
- class MockOwner:
- label = "owner_label"
- def get_label(self):
- return self.label
- get_mck.return_value = MockOwner()
- assert scenario.get_label() == "owner_label > " + scenario.config_id
- def test_create_scenario_at_time(current_datetime):
- scenario = Scenario("bar", set(), {}, set(), ScenarioId("baz"), creation_date=current_datetime)
- assert scenario.id == "baz"
- assert scenario.config_id == "bar"
- assert scenario.tasks == {}
- assert scenario.additional_data_nodes == {}
- assert scenario.data_nodes == {}
- assert scenario.sequences == {}
- assert scenario.properties == {}
- assert scenario.creation_date == current_datetime
- assert not scenario.is_primary
- assert scenario.cycle is None
- assert scenario.tags == set()
- assert scenario.get_simple_label() == scenario.config_id
- assert scenario.get_label() == scenario.config_id
- def test_create_scenario_with_task_and_additional_dn_and_sequence():
- dn_1 = PickleDataNode("xyz", Scope.SCENARIO)
- dn_2 = PickleDataNode("abc", Scope.SCENARIO)
- task = Task("qux", {}, print, [dn_1])
- scenario = Scenario("quux", {task}, {}, {dn_2}, sequences={"acb": {"tasks": [task]}})
- sequence = scenario.sequences["acb"]
- assert scenario.id is not None
- assert scenario.config_id == "quux"
- assert len(scenario.tasks) == 1
- assert len(scenario.additional_data_nodes) == 1
- assert len(scenario.data_nodes) == 2
- assert len(scenario.sequences) == 1
- assert scenario.qux == task
- assert scenario.xyz == dn_1
- assert scenario.abc == dn_2
- assert scenario.acb == sequence
- assert scenario.properties == {}
- assert scenario.tags == set()
- def test_create_scenario_invalid_config_id():
- with pytest.raises(InvalidConfigurationId):
- Scenario("foo bar", set(), {})
- def test_create_scenario_and_add_sequences():
- input_1 = PickleDataNode("input_1", Scope.SCENARIO)
- output_1 = PickleDataNode("output_1", Scope.SCENARIO)
- output_2 = PickleDataNode("output_2", Scope.SCENARIO)
- additional_dn_1 = PickleDataNode("additional_1", Scope.SCENARIO)
- additional_dn_2 = PickleDataNode("additional_2", Scope.SCENARIO)
- task_1 = Task("task_1", {}, print, [input_1], [output_1], TaskId("task_id_1"))
- task_2 = Task("task_2", {}, print, [output_1], [output_2], TaskId("task_id_2"))
- data_manager = _DataManagerFactory._build_manager()
- task_manager = _TaskManagerFactory._build_manager()
- data_manager._repository._save(input_1)
- data_manager._repository._save(output_1)
- data_manager._repository._save(output_2)
- data_manager._repository._save(additional_dn_1)
- data_manager._repository._save(additional_dn_2)
- task_manager._repository._save(task_1)
- task_manager._repository._save(task_2)
- scenario = Scenario("scenario", {task_1}, {})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- scenario.sequences = {"sequence_1": {"tasks": [task_1]}, "sequence_2": {"tasks": []}}
- assert scenario.id is not None
- assert scenario.config_id == "scenario"
- assert len(scenario.tasks) == 1
- assert scenario.tasks.keys() == {task_1.config_id}
- assert len(scenario.additional_data_nodes) == 0
- assert scenario.additional_data_nodes == {}
- assert len(scenario.data_nodes) == 2
- assert scenario.data_nodes == {
- input_1.config_id: input_1,
- output_1.config_id: output_1,
- }
- assert len(scenario.sequences) == 2
- assert scenario.sequence_1 == scenario.sequences["sequence_1"]
- assert scenario.sequence_2 == scenario.sequences["sequence_2"]
- assert scenario.sequences == {"sequence_1": scenario.sequence_1, "sequence_2": scenario.sequence_2}
- def test_get_set_attribute():
- dn_cfg = Config.configure_data_node("bar")
- s_cfg = Config.configure_scenario("foo", additional_data_node_configs=[dn_cfg])
- scenario = create_scenario(s_cfg)
- scenario.key = "value"
- assert scenario.key == "value"
- with pytest.raises(AttributeKeyAlreadyExisted):
- scenario.bar = "KeyAlreadyUsed"
- def test_create_scenario_overlapping_sequences():
- input_1 = PickleDataNode("input_1", Scope.SCENARIO)
- output_1 = PickleDataNode("output_1", Scope.SCENARIO)
- output_2 = PickleDataNode("output_2", Scope.SCENARIO)
- additional_dn_1 = PickleDataNode("additional_1", Scope.SCENARIO)
- additional_dn_2 = PickleDataNode("additional_2", Scope.SCENARIO)
- task_1 = Task("task_1", {}, print, [input_1], [output_1], TaskId("task_id_1"))
- task_2 = Task("task_2", {}, print, [output_1], [output_2], TaskId("task_id_2"))
- data_manager = _DataManagerFactory._build_manager()
- task_manager = _TaskManagerFactory._build_manager()
- data_manager._repository._save(input_1)
- data_manager._repository._save(output_1)
- data_manager._repository._save(output_2)
- data_manager._repository._save(additional_dn_1)
- data_manager._repository._save(additional_dn_2)
- task_manager._repository._save(task_1)
- task_manager._repository._save(task_2)
- scenario = Scenario("scenario", {task_1, task_2}, {})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- scenario.add_sequence("sequence_1", [task_1])
- scenario.add_sequence("sequence_2", [task_1, task_2])
- assert scenario.id is not None
- assert scenario.config_id == "scenario"
- assert len(scenario.tasks) == 2
- assert scenario.tasks.keys() == {task_1.config_id, task_2.config_id}
- assert len(scenario.additional_data_nodes) == 0
- assert scenario.additional_data_nodes == {}
- assert len(scenario.data_nodes) == 3
- assert scenario.data_nodes == {
- input_1.config_id: input_1,
- output_1.config_id: output_1,
- output_2.config_id: output_2,
- }
- sequence_1 = scenario.sequences["sequence_1"]
- sequence_2 = scenario.sequences["sequence_2"]
- assert scenario.sequences == {"sequence_1": sequence_1, "sequence_2": sequence_2}
- scenario.remove_sequences(["sequence_2"])
- assert scenario.sequences == {"sequence_1": sequence_1}
- scenario.remove_sequences(["sequence_1"])
- assert scenario.sequences == {}
- def test_create_scenario_one_additional_dn():
- input_1 = PickleDataNode("input_1", Scope.SCENARIO)
- input_2 = PickleDataNode("input_2", Scope.SCENARIO)
- output_1 = PickleDataNode("output_1", Scope.SCENARIO)
- output_2 = PickleDataNode("output_2", Scope.SCENARIO)
- additional_dn_1 = PickleDataNode("additional_1", Scope.SCENARIO)
- additional_dn_2 = PickleDataNode("additional_2", Scope.SCENARIO)
- task_1 = Task("task_1", {}, print, [input_1], [output_1], TaskId("task_id_1"))
- task_2 = Task("task_2", {}, print, [input_2], [output_2], TaskId("task_id_2"))
- data_manager = _DataManagerFactory._build_manager()
- task_manager = _TaskManagerFactory._build_manager()
- data_manager._repository._save(input_1)
- data_manager._repository._save(output_1)
- data_manager._repository._save(input_2)
- data_manager._repository._save(output_2)
- data_manager._repository._save(additional_dn_1)
- data_manager._repository._save(additional_dn_2)
- task_manager._repository._save(task_1)
- task_manager._repository._save(task_2)
- scenario = Scenario("scenario", set(), {}, {additional_dn_1})
- assert scenario.id is not None
- assert scenario.config_id == "scenario"
- assert len(scenario.tasks) == 0
- assert len(scenario.additional_data_nodes) == 1
- assert len(scenario.data_nodes) == 1
- assert scenario.tasks == {}
- assert scenario.additional_data_nodes == {additional_dn_1.config_id: additional_dn_1}
- assert scenario.data_nodes == {additional_dn_1.config_id: additional_dn_1}
- def test_create_scenario_wth_additional_dns():
- input_1 = PickleDataNode("input_1", Scope.SCENARIO)
- input_2 = PickleDataNode("input_2", Scope.SCENARIO)
- output_1 = PickleDataNode("output_1", Scope.SCENARIO)
- output_2 = PickleDataNode("output_2", Scope.SCENARIO)
- additional_dn_1 = PickleDataNode("additional_1", Scope.SCENARIO)
- additional_dn_2 = PickleDataNode("additional_2", Scope.SCENARIO)
- task_1 = Task("task_1", {}, print, [input_1], [output_1], TaskId("task_id_1"))
- task_2 = Task("task_2", {}, print, [input_2], [output_2], TaskId("task_id_2"))
- data_manager = _DataManagerFactory._build_manager()
- task_manager = _TaskManagerFactory._build_manager()
- data_manager._repository._save(input_1)
- data_manager._repository._save(output_1)
- data_manager._repository._save(input_2)
- data_manager._repository._save(output_2)
- data_manager._repository._save(additional_dn_1)
- data_manager._repository._save(additional_dn_2)
- task_manager._repository._save(task_1)
- task_manager._repository._save(task_2)
- scenario = Scenario("scenario", set(), {}, {additional_dn_1, additional_dn_2})
- assert scenario.id is not None
- assert scenario.config_id == "scenario"
- assert len(scenario.tasks) == 0
- assert len(scenario.additional_data_nodes) == 2
- assert len(scenario.data_nodes) == 2
- assert scenario.tasks == {}
- assert scenario.additional_data_nodes == {
- additional_dn_1.config_id: additional_dn_1,
- additional_dn_2.config_id: additional_dn_2,
- }
- assert scenario.data_nodes == {
- additional_dn_1.config_id: additional_dn_1,
- additional_dn_2.config_id: additional_dn_2,
- }
- scenario_1 = Scenario("scenario_1", {task_1}, {}, {additional_dn_1})
- assert scenario_1.id is not None
- assert scenario_1.config_id == "scenario_1"
- assert len(scenario_1.tasks) == 1
- assert len(scenario_1.additional_data_nodes) == 1
- assert len(scenario_1.data_nodes) == 3
- assert scenario_1.tasks.keys() == {task_1.config_id}
- assert scenario_1.additional_data_nodes == {
- additional_dn_1.config_id: additional_dn_1,
- }
- assert scenario_1.data_nodes == {
- input_1.config_id: input_1,
- output_1.config_id: output_1,
- additional_dn_1.config_id: additional_dn_1,
- }
- scenario_2 = Scenario("scenario_2", {task_1, task_2}, {}, {additional_dn_1, additional_dn_2})
- assert scenario_2.id is not None
- assert scenario_2.config_id == "scenario_2"
- assert len(scenario_2.tasks) == 2
- assert len(scenario_2.additional_data_nodes) == 2
- assert len(scenario_2.data_nodes) == 6
- assert scenario_2.tasks.keys() == {task_1.config_id, task_2.config_id}
- assert scenario_2.additional_data_nodes == {
- additional_dn_1.config_id: additional_dn_1,
- additional_dn_2.config_id: additional_dn_2,
- }
- assert {dn_config_id: dn.id for dn_config_id, dn in scenario_2.data_nodes.items()} == {
- input_1.config_id: input_1.id,
- output_1.config_id: output_1.id,
- input_2.config_id: input_2.id,
- output_2.config_id: output_2.id,
- additional_dn_1.config_id: additional_dn_1.id,
- additional_dn_2.config_id: additional_dn_2.id,
- }
- def test_raise_sequence_tasks_not_in_scenario(data_node):
- task_1 = Task("task_1", {}, print, output=[data_node])
- task_2 = Task("task_2", {}, print, input=[data_node])
- with pytest.raises(SequenceTaskDoesNotExistInScenario) as err:
- Scenario("scenario", set(), {}, sequences={"sequence": {"tasks": [task_1]}}, scenario_id="SCENARIO_scenario")
- assert err.value.args == ([task_1.id], "sequence", "SCENARIO_scenario")
- with pytest.raises(SequenceTaskDoesNotExistInScenario) as err:
- Scenario(
- "scenario",
- [task_1],
- {},
- sequences={"sequence": {"tasks": [task_1, task_2]}},
- scenario_id="SCENARIO_scenario",
- )
- assert err.value.args == ([task_2.id], "sequence", "SCENARIO_scenario")
- Scenario("scenario", {task_1}, {}, sequences={"sequence": {"tasks": [task_1]}})
- Scenario(
- "scenario",
- [task_1, task_2],
- {},
- sequences={"sequence_1": {"tasks": [task_1]}, "sequence_2": {"tasks": [task_1, task_2]}},
- )
- def test_adding_sequence_raises_tasks_not_in_scenario(data_node):
- _DataManagerFactory._build_manager()._repository._save(data_node)
- task_1 = Task("task_1", {}, print, output=[data_node])
- task_2 = Task("task_2", {}, print, input=[data_node])
- scenario = Scenario("scenario", [task_1], {})
- scenario_manager = _ScenarioManagerFactory._build_manager()
- task_manager = _TaskManagerFactory._build_manager()
- scenario_manager._repository._save(scenario)
- task_manager._repository._save(task_1)
- task_manager._repository._save(task_2)
- scenario.add_sequences({"sequence_1": {}})
- with pytest.raises(SequenceTaskDoesNotExistInScenario) as err:
- scenario.add_sequence("sequence_2", [task_2])
- assert err.value.args == ([task_2.id], "sequence_2", scenario.id)
- scenario.add_sequence("sequence_3", [task_1])
- with pytest.raises(SequenceTaskDoesNotExistInScenario) as err:
- scenario.add_sequences({"sequence_4": [task_2]})
- assert err.value.args == ([task_2.id], "sequence_4", scenario.id)
- with pytest.raises(SequenceTaskDoesNotExistInScenario) as err:
- scenario.add_sequences({"sequence_5": [task_1, task_2]})
- assert err.value.args == ([task_2.id], "sequence_5", scenario.id)
- scenario.tasks = [task_1, task_2]
- scenario.add_sequence("sequence_6", [task_1, task_2])
- def test_adding_existing_sequence_raises_exception(data_node):
- _DataManagerFactory._build_manager()._repository._save(data_node)
- task_1 = Task("task_1", {}, print, output=[data_node])
- _TaskManagerFactory._build_manager()._repository._save(task_1)
- task_2 = Task("task_2", {}, print, input=[data_node])
- _TaskManagerFactory._build_manager()._repository._save(task_2)
- scenario = Scenario("scenario", tasks={task_1, task_2}, properties={})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- scenario.add_sequence("sequence_1", [task_1])
- with pytest.raises(SequenceAlreadyExists):
- scenario.add_sequence("sequence_1", [task_2])
- def test_renaming_existing_sequence_raises_exception(data_node):
- _DataManagerFactory._build_manager()._repository._save(data_node)
- task_1 = Task("task_1", {}, print, output=[data_node])
- _TaskManagerFactory._build_manager()._repository._save(task_1)
- task_2 = Task("task_2", {}, print, input=[data_node])
- _TaskManagerFactory._build_manager()._repository._save(task_2)
- scenario = Scenario("scenario", {task_1, task_2}, {})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- scenario.add_sequence("sequence_1", [task_1])
- scenario.add_sequence("sequence_2", [task_2])
- with pytest.raises(SequenceAlreadyExists):
- scenario.rename_sequence("sequence_1", "sequence_2")
- def test_add_rename_and_remove_sequences():
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_3 = InMemoryDataNode("qux", Scope.SCENARIO, "s3")
- data_node_4 = InMemoryDataNode("quux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quuz", Scope.SCENARIO, "s5")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_3], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_3], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_3], [data_node_4], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_5], [data_node_3], TaskId("t5"))
- scenario = Scenario("quest", {task_1, task_2, task_3, task_4, task_5}, {}, [], scenario_id=ScenarioId("s1"))
- sequence_1 = Sequence({"name": "seq_1"}, [task_1], SequenceId(f"SEQUENCE_seq_1_{scenario.id}"))
- sequence_2 = Sequence({"name": "seq_2"}, [task_1, task_2], SequenceId(f"SEQUENCE_seq_2_{scenario.id}"))
- new_seq_2 = Sequence({"name": "seq_2"}, [task_1, task_2], SequenceId(f"SEQUENCE_new_seq_2_{scenario.id}"))
- seq_3 = Sequence({"name": "seq 3"}, [task_1, task_5, task_3], SequenceId(f"SEQUENCE_seqTPSPACE3_{scenario.id}"))
- task_manager = _TaskManagerFactory._build_manager()
- data_manager = _DataManagerFactory._build_manager()
- scenario_manager = _ScenarioManagerFactory._build_manager()
- for dn in [data_node_1, data_node_2, data_node_3, data_node_4, data_node_5]:
- data_manager._repository._save(dn)
- for t in [task_1, task_2, task_3, task_4, task_5]:
- task_manager._repository._save(t)
- scenario_manager._repository._save(scenario)
- assert scenario.get_inputs() == {data_node_1, data_node_2, data_node_5}
- assert scenario._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5}
- assert len(scenario.sequences) == 0
- scenario.sequences = {"seq_1": {"tasks": [task_1]}}
- assert scenario.sequences == {"seq_1": sequence_1}
- scenario.add_sequences({"seq_2": [task_1, task_2]})
- assert scenario.sequences == {"seq_1": sequence_1, "seq_2": sequence_2}
- scenario.remove_sequences(["seq_1"])
- assert scenario.sequences == {"seq_2": sequence_2}
- scenario.add_sequences({"seq_1": [task_1], "seq 3": [task_1, task_5, task_3]})
- assert scenario.sequences == {"seq_2": sequence_2, "seq_1": sequence_1, "seq 3": seq_3}
- scenario.remove_sequences(["seq_2", "seq 3"])
- assert scenario.sequences == {"seq_1": sequence_1}
- scenario.add_sequence("seq_2", [task_1, task_2])
- assert scenario.sequences == {"seq_1": sequence_1, "seq_2": sequence_2}
- scenario.add_sequence("seq 3", [task_1.id, task_5.id, task_3.id])
- assert scenario.sequences == {"seq_1": sequence_1, "seq_2": sequence_2, "seq 3": seq_3}
- scenario.remove_sequence("seq_1")
- assert scenario.sequences == {"seq_2": sequence_2, "seq 3": seq_3}
- scenario.rename_sequence("seq_2", "new_seq_2")
- assert scenario.sequences == {"new_seq_2": new_seq_2, "seq 3": seq_3}
- def test_update_sequence(data_node):
- _DataManagerFactory._build_manager()._repository._save(data_node)
- task_1 = Task("foo", {}, print, [data_node], [], TaskId("t1"))
- task_2 = Task("bar", {}, print, [], [data_node], id=TaskId("t2"))
- scenario = Scenario("baz", {task_1, task_2}, {})
- _TaskManagerFactory._build_manager()._repository._save(task_1)
- _TaskManagerFactory._build_manager()._repository._save(task_2)
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- scenario.add_sequence("seq_1", [task_1])
- assert len(scenario.sequences) == 1
- assert scenario.sequences["seq_1"].tasks == {"foo": task_1}
- assert scenario.sequences["seq_1"].properties["name"] == "seq_1"
- scenario.update_sequence("seq_1", [task_2], {"new_key": "new_value"}, [])
- assert len(scenario.sequences) == 1
- assert scenario.sequences["seq_1"].tasks == {"bar": task_2}
- assert scenario.sequences["seq_1"].properties["name"] == "seq_1"
- assert scenario.sequences["seq_1"].properties["new_key"] == "new_value"
- def test_add_rename_and_remove_sequences_within_context(data_node):
- _DataManagerFactory._build_manager()._repository._save(data_node)
- task_1 = Task("task_1", {}, print, output=[data_node])
- task_2 = Task("task_2", {}, print, input=[data_node])
- _TaskManagerFactory._build_manager()._repository._save(task_1)
- _TaskManagerFactory._build_manager()._repository._save(task_2)
- scenario = Scenario(config_id="scenario", tasks={task_1, task_2}, properties={})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- with scenario as sc:
- sc.add_sequence("seq_name", [task_1])
- assert len(scenario.sequences) == 1
- assert scenario.sequences["seq_name"].tasks == {"task_1": task_1}
- with scenario as sc:
- sc.update_sequence("seq_name", [task_2])
- assert len(scenario.sequences) == 1
- assert scenario.sequences["seq_name"].tasks == {"task_2": task_2}
- with scenario as sc:
- sc.rename_sequence("seq_name", "seq name")
- assert scenario.sequences["seq name"].tasks == {"task_2": task_2}
- with scenario as sc:
- sc.remove_sequence("seq name")
- assert len(scenario.sequences) == 0
- def test_add_property_to_scenario():
- scenario = Scenario("foo", set(), {"key": "value"})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- assert scenario.properties == {"key": "value"}
- assert scenario.properties["key"] == "value"
- scenario.properties["new_key"] = "new_value"
- assert scenario.properties == {"key": "value", "new_key": "new_value"}
- assert scenario.properties["key"] == "value"
- assert scenario.properties["new_key"] == "new_value"
- def test_add_cycle_to_scenario(cycle):
- scenario = Scenario("foo", set(), {})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- assert scenario.cycle is None
- _CycleManagerFactory._build_manager()._repository._save(cycle)
- scenario.cycle = cycle
- assert scenario.cycle == cycle
- def test_add_and_remove_subscriber():
- scenario = Scenario("foo", set(), {})
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- scenario._add_subscriber(print)
- assert len(scenario.subscribers) == 1
- scenario._remove_subscriber(print)
- assert len(scenario.subscribers) == 0
- def test_add_and_remove_tag():
- scenario = Scenario("foo", set(), {})
- assert len(scenario.tags) == 0
- scenario._add_tag("tag")
- assert len(scenario.tags) == 1
- scenario._remove_tag("tag")
- assert len(scenario.tags) == 0
- def test_auto_update_and_reload(cycle, current_datetime, task, data_node):
- scenario_1 = Scenario(
- "foo",
- set(),
- {"name": "bar"},
- set(),
- creation_date=current_datetime,
- is_primary=True,
- cycle=None,
- )
- additional_dn = InMemoryDataNode("additional_dn", Scope.SCENARIO)
- example_date = datetime.fromisoformat("2021-11-11T11:11:01.000001")
- tmp_cycle = Cycle(
- Frequency.WEEKLY,
- {},
- creation_date=example_date,
- start_date=example_date,
- end_date=example_date,
- name="cc",
- id=CycleId("tmp_cc_id"),
- )
- sequence_1_name = "sequence_1"
- sequence_1 = Sequence({}, [], SequenceId(f"SEQUENCE_{sequence_1_name}_{scenario_1.id}"))
- tmp_sequence_name = "tmp_sequence"
- tmp_sequence = Sequence(
- {},
- [],
- SequenceId(f"SEQUENCE_{tmp_sequence_name}_{scenario_1.id}"),
- )
- _TaskManagerFactory._build_manager()._repository._save(task)
- _DataManagerFactory._build_manager()._repository._save(data_node)
- _DataManagerFactory._build_manager()._repository._save(additional_dn)
- scenario_manager = _ScenarioManagerFactory._build_manager()
- cycle_manager = _CycleManagerFactory._build_manager()
- cycle_manager._repository._save(cycle)
- cycle_manager._repository._save(tmp_cycle)
- scenario_manager._repository._save(scenario_1)
- scenario_2 = scenario_manager._get(scenario_1)
- assert scenario_1.config_id == "foo"
- assert scenario_2.config_id == "foo"
- # auto set & reload on name attribute
- assert scenario_1.name == "bar"
- assert scenario_2.name == "bar"
- scenario_1.name = "zab"
- assert scenario_1.name == "zab"
- assert scenario_2.name == "zab"
- scenario_2.name = "baz"
- assert scenario_1.name == "baz"
- assert scenario_2.name == "baz"
- # auto set & reload on sequences attribute
- assert len(scenario_1.sequences) == 0
- assert len(scenario_2.sequences) == 0
- scenario_1.sequences = {tmp_sequence_name: {}}
- assert len(scenario_1.sequences) == 1
- assert scenario_1.sequences[tmp_sequence_name] == tmp_sequence
- assert len(scenario_2.sequences) == 1
- assert scenario_2.sequences[tmp_sequence_name] == tmp_sequence
- scenario_2.add_sequences({sequence_1_name: []})
- assert len(scenario_1.sequences) == 2
- assert scenario_1.sequences == {sequence_1_name: sequence_1, tmp_sequence_name: tmp_sequence}
- assert len(scenario_2.sequences) == 2
- assert scenario_2.sequences == {sequence_1_name: sequence_1, tmp_sequence_name: tmp_sequence}
- scenario_2.remove_sequences([tmp_sequence_name])
- assert len(scenario_1.sequences) == 1
- assert scenario_1.sequences == {sequence_1_name: sequence_1}
- assert len(scenario_2.sequences) == 1
- assert scenario_2.sequences == {sequence_1_name: sequence_1}
- assert len(scenario_1.tasks) == 0
- assert len(scenario_1.data_nodes) == 0
- scenario_1.tasks = {task}
- assert len(scenario_1.tasks) == 1
- assert scenario_1.tasks[task.config_id] == task
- assert len(scenario_1.data_nodes) == 2
- assert len(scenario_2.tasks) == 1
- assert scenario_2.tasks[task.config_id] == task
- assert len(scenario_2.data_nodes) == 2
- assert len(scenario_1.additional_data_nodes) == 0
- scenario_1.additional_data_nodes = {additional_dn}
- assert len(scenario_1.additional_data_nodes) == 1
- assert scenario_1.additional_data_nodes[additional_dn.config_id] == additional_dn
- assert len(scenario_1.data_nodes) == 3
- assert len(scenario_2.additional_data_nodes) == 1
- assert scenario_2.additional_data_nodes[additional_dn.config_id] == additional_dn
- assert len(scenario_2.data_nodes) == 3
- new_datetime = current_datetime + timedelta(1)
- new_datetime_1 = current_datetime + timedelta(2)
- # auto set & reload on name attribute
- assert scenario_1.creation_date == current_datetime
- assert scenario_2.creation_date == current_datetime
- scenario_1.creation_date = new_datetime_1
- assert scenario_1.creation_date == new_datetime_1
- assert scenario_2.creation_date == new_datetime_1
- scenario_2.creation_date = new_datetime
- assert scenario_1.creation_date == new_datetime
- assert scenario_2.creation_date == new_datetime
- # auto set & reload on cycle attribute
- assert scenario_1.cycle is None
- assert scenario_2.cycle is None
- scenario_1.cycle = tmp_cycle
- assert scenario_1.cycle == tmp_cycle
- assert scenario_2.cycle == tmp_cycle
- scenario_2.cycle = cycle
- assert scenario_1.cycle == cycle
- assert scenario_2.cycle == cycle
- # auto set & reload on is_primary attribute
- assert scenario_1.is_primary
- assert scenario_2.is_primary
- scenario_1.is_primary = False
- assert not scenario_1.is_primary
- assert not scenario_2.is_primary
- scenario_2.is_primary = True
- assert scenario_1.is_primary
- assert scenario_2.is_primary
- # auto set & reload on subscribers attribute
- assert len(scenario_1.subscribers) == 0
- assert len(scenario_2.subscribers) == 0
- scenario_1.subscribers.append(_Subscriber(print, []))
- assert len(scenario_1.subscribers) == 1
- assert len(scenario_2.subscribers) == 1
- scenario_2.subscribers.append(_Subscriber(print, []))
- assert len(scenario_1.subscribers) == 2
- assert len(scenario_2.subscribers) == 2
- scenario_1.subscribers.clear()
- assert len(scenario_1.subscribers) == 0
- assert len(scenario_2.subscribers) == 0
- scenario_1.subscribers.extend([_Subscriber(print, []), _Subscriber(map, [])])
- assert len(scenario_1.subscribers) == 2
- assert len(scenario_2.subscribers) == 2
- scenario_1.subscribers.remove(_Subscriber(print, []))
- assert len(scenario_1.subscribers) == 1
- assert len(scenario_2.subscribers) == 1
- scenario_1.subscribers + print + len
- assert len(scenario_1.subscribers) == 3
- assert len(scenario_2.subscribers) == 3
- scenario_1.subscribers = []
- assert len(scenario_1.subscribers) == 0
- assert len(scenario_2.subscribers) == 0
- assert len(scenario_1.tags) == 0
- scenario_1.tags = {"hi"}
- assert len(scenario_1.tags) == 1
- assert len(scenario_2.tags) == 1
- with scenario_1 as scenario:
- assert scenario.config_id == "foo"
- assert len(scenario.tasks) == 1
- assert len(scenario.sequences) == 1
- assert scenario.sequences["sequence_1"] == sequence_1
- assert scenario.tasks[task.config_id] == task
- assert len(scenario.additional_data_nodes) == 1
- assert scenario.additional_data_nodes[additional_dn.config_id] == additional_dn
- assert scenario.creation_date == new_datetime
- assert scenario.cycle == cycle
- assert scenario.is_primary
- assert len(scenario.subscribers) == 0
- assert len(scenario.tags) == 1
- assert scenario._is_in_context
- assert scenario.name == "baz"
- new_datetime_2 = new_datetime + timedelta(5)
- scenario._config_id = "foo"
- scenario.tasks = set()
- scenario.additional_data_nodes = set()
- scenario.remove_sequences([sequence_1_name])
- scenario.creation_date = new_datetime_2
- scenario.cycle = None
- scenario.is_primary = False
- scenario.subscribers = [print]
- scenario.tags = None
- scenario.name = "qux"
- assert scenario.config_id == "foo"
- assert len(scenario.sequences) == 1
- assert scenario.sequences[sequence_1_name] == sequence_1
- assert len(scenario.tasks) == 1
- assert scenario.tasks[task.config_id] == task
- assert len(scenario.additional_data_nodes) == 1
- assert scenario.additional_data_nodes[additional_dn.config_id] == additional_dn
- assert scenario.creation_date == new_datetime
- assert scenario.cycle == cycle
- assert scenario.is_primary
- assert len(scenario.subscribers) == 0
- assert len(scenario.tags) == 1
- assert scenario._is_in_context
- assert scenario.name == "baz"
- assert scenario_1.config_id == "foo"
- assert len(scenario_1.sequences) == 0
- assert len(scenario_1.tasks) == 0
- assert len(scenario_1.additional_data_nodes) == 0
- assert scenario_1.tasks == {}
- assert scenario_1.additional_data_nodes == {}
- assert scenario_1.creation_date == new_datetime_2
- assert scenario_1.cycle is None
- assert not scenario_1.is_primary
- assert len(scenario_1.subscribers) == 1
- assert len(scenario_1.tags) == 0
- assert not scenario_1._is_in_context
- def test_auto_update_and_reload_properties():
- scenario_1 = Scenario(
- "foo",
- set(),
- {"name": "baz"},
- )
- scenario_manager = _ScenarioManagerFactory._build_manager()
- scenario_manager._repository._save(scenario_1)
- scenario_2 = scenario_manager._get(scenario_1)
- # auto set & reload on properties attribute
- assert scenario_1.properties == {"name": "baz"}
- assert scenario_2.properties == {"name": "baz"}
- scenario_1._properties["qux"] = 4
- assert scenario_1.properties["qux"] == 4
- assert scenario_2.properties["qux"] == 4
- assert scenario_1.properties == {"name": "baz", "qux": 4}
- assert scenario_2.properties == {"name": "baz", "qux": 4}
- scenario_2._properties["qux"] = 5
- assert scenario_1.properties["qux"] == 5
- assert scenario_2.properties["qux"] == 5
- scenario_1.properties["temp_key_1"] = "temp_value_1"
- scenario_1.properties["temp_key_2"] = "temp_value_2"
- assert scenario_1.properties == {
- "name": "baz",
- "qux": 5,
- "temp_key_1": "temp_value_1",
- "temp_key_2": "temp_value_2",
- }
- assert scenario_2.properties == {
- "name": "baz",
- "qux": 5,
- "temp_key_1": "temp_value_1",
- "temp_key_2": "temp_value_2",
- }
- scenario_1.properties.pop("temp_key_1")
- assert "temp_key_1" not in scenario_1.properties.keys()
- assert "temp_key_1" not in scenario_1.properties.keys()
- assert scenario_1.properties == {
- "name": "baz",
- "qux": 5,
- "temp_key_2": "temp_value_2",
- }
- assert scenario_2.properties == {
- "name": "baz",
- "qux": 5,
- "temp_key_2": "temp_value_2",
- }
- scenario_2.properties.pop("temp_key_2")
- assert scenario_1.properties == {"name": "baz", "qux": 5}
- assert scenario_2.properties == {"name": "baz", "qux": 5}
- assert "temp_key_2" not in scenario_1.properties.keys()
- assert "temp_key_2" not in scenario_2.properties.keys()
- scenario_1.properties["temp_key_3"] = 0
- assert scenario_1.properties == {"name": "baz", "qux": 5, "temp_key_3": 0}
- assert scenario_2.properties == {"name": "baz", "qux": 5, "temp_key_3": 0}
- scenario_1.properties.update({"temp_key_3": 1})
- assert scenario_1.properties == {"name": "baz", "qux": 5, "temp_key_3": 1}
- assert scenario_2.properties == {"name": "baz", "qux": 5, "temp_key_3": 1}
- scenario_1.properties.update({})
- assert scenario_1.properties == {"name": "baz", "qux": 5, "temp_key_3": 1}
- assert scenario_2.properties == {"name": "baz", "qux": 5, "temp_key_3": 1}
- scenario_1.properties["temp_key_4"] = 0
- scenario_1.properties["temp_key_5"] = 0
- with scenario_1 as scenario:
- assert scenario.properties["qux"] == 5
- assert scenario.properties["temp_key_3"] == 1
- assert scenario.properties["temp_key_4"] == 0
- assert scenario.properties["temp_key_5"] == 0
- scenario.properties["qux"] = 9
- scenario.properties.pop("temp_key_3")
- scenario.properties.pop("temp_key_4")
- scenario.properties.update({"temp_key_4": 1})
- scenario.properties.update({"temp_key_5": 2})
- scenario.properties.pop("temp_key_5")
- scenario.properties.update({})
- assert scenario._is_in_context
- assert scenario.properties["qux"] == 5
- assert scenario.properties["temp_key_3"] == 1
- assert scenario.properties["temp_key_4"] == 0
- assert scenario.properties["temp_key_5"] == 0
- assert not scenario_1._is_in_context
- assert scenario_1.properties["qux"] == 9
- assert "temp_key_3" not in scenario_1.properties.keys()
- assert scenario_1.properties["temp_key_4"] == 1
- assert "temp_key_5" not in scenario_1.properties.keys()
- def test_is_deletable():
- with mock.patch("taipy.core.scenario._scenario_manager._ScenarioManager._is_deletable") as mock_submit:
- scenario = Scenario("foo", set(), {})
- scenario.is_deletable()
- mock_submit.assert_called_once_with(scenario)
- def test_submit_scenario():
- with mock.patch("taipy.core.scenario._scenario_manager._ScenarioManager._submit") as mock_submit:
- scenario = Scenario("foo", set(), {})
- scenario.submit(force=False)
- mock_submit.assert_called_once_with(scenario, None, False, False, None)
- def test_subscribe_scenario():
- with mock.patch("taipy.core.subscribe_scenario") as mock_subscribe:
- scenario = Scenario("foo", set(), {})
- scenario.subscribe(None)
- mock_subscribe.assert_called_once_with(None, None, scenario)
- def test_unsubscribe_scenario():
- with mock.patch("taipy.core.unsubscribe_scenario") as mock_unsubscribe:
- scenario = Scenario("foo", set(), {})
- scenario.unsubscribe(None)
- mock_unsubscribe.assert_called_once_with(None, None, scenario)
- def test_add_tag_scenario():
- with mock.patch("taipy.core.tag") as mock_add_tag:
- scenario = Scenario("foo", set(), {})
- scenario.add_tag("tag")
- mock_add_tag.assert_called_once_with(scenario, "tag")
- def test_remove_tag_scenario():
- with mock.patch("taipy.core.untag") as mock_remove_tag:
- scenario = Scenario("foo", set(), {})
- scenario.remove_tag("tag")
- mock_remove_tag.assert_called_once_with(scenario, "tag")
- def test_get_inputs_outputs_intermediate_data_nodes():
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_3 = DataNode("baz", Scope.SCENARIO, "s3")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario = Scenario("scenario", {task_1, task_2, task_3, task_4}, {}, set(), ScenarioId("s1"))
- # s1 --- ---> s3 ---> t2 ---> s5 ----
- # | | |
- # |---> t1 ---| -------------------------> t3 ---> s6
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- assert scenario.get_inputs() == {data_node_1, data_node_2}
- assert scenario.get_outputs() == {data_node_6, data_node_7}
- assert scenario.get_intermediate() == {data_node_3, data_node_4, data_node_5}
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario = Scenario("scenario", {task_1, task_2, task_3, task_4}, {}, set(), ScenarioId("s1"))
- # s1 --- t2 ---> s5 ------
- # | |
- # |---> t1 ---| -----> t3 ---> s6
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- assert scenario.get_inputs() == {data_node_1, data_node_2}
- assert scenario.get_outputs() == {data_node_6, data_node_7}
- assert scenario.get_intermediate() == {data_node_4, data_node_5}
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
- data_node_8 = DataNode("d8", Scope.SCENARIO, "s8")
- data_node_9 = DataNode("d9", Scope.SCENARIO, "s9")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], TaskId("t5"))
- task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=TaskId("t6"))
- scenario = Scenario("scenario", {task_1, task_2, task_3, task_4, task_5, task_6}, {}, set(), ScenarioId("s1"))
- # s1 --- s6 ---> t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7 ---> t6
- # |
- # s8 -------> t5 -------> s9 ------------------
- assert scenario.get_inputs() == {data_node_1, data_node_2, data_node_6, data_node_8}
- assert scenario.get_outputs() == set()
- assert scenario.get_intermediate() == {data_node_5, data_node_4, data_node_7, data_node_9}
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
- data_node_8 = DataNode("hugh", Scope.SCENARIO, "s8")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, output=[data_node_5], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4, data_node_6], [data_node_7], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_8], None, TaskId("t5"))
- scenario = Scenario("scenario", {task_1, task_2, task_3, task_4, task_5}, {}, set(), ScenarioId("sc1"))
- # s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- # t2 ---> s5 |
- # s8 ---> t5 s6 --|
- assert scenario.get_inputs() == {data_node_1, data_node_2, data_node_8, data_node_6}
- assert scenario.get_outputs() == {data_node_5, data_node_7}
- assert scenario.get_intermediate() == {data_node_4}
- def test_is_ready_to_run():
- task_1_id, task_2_id, task_3_id, task_4_id, task_5_id, task_6_id = (
- TaskId("TASK_t1"),
- TaskId("TASK_t2"),
- TaskId("TASK_t3"),
- TaskId("TASK_t4"),
- TaskId("TASK_t5"),
- TaskId("TASK_t6"),
- )
- sc_id = ScenarioId("SCENARIO_s1")
- data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "d1", parent_ids={task_1_id}, properties={"default_data": 1})
- data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "d2", parent_ids={task_1_id}, properties={"default_data": 2})
- data_node_4 = PickleDataNode(
- "qux", Scope.SCENARIO, "d4", parent_ids={task_1_id, task_4_id, task_3_id}, properties={"default_data": 4}
- )
- data_node_5 = PickleDataNode(
- "quux", Scope.SCENARIO, "d5", parent_ids={task_2_id, task_3_id}, properties={"default_data": 5}
- )
- data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "d6", parent_ids={task_2_id}, properties={"default_data": 6})
- data_node_7 = PickleDataNode(
- "corge", Scope.SCENARIO, "d7", parent_ids={task_4_id, task_6_id}, properties={"default_data": 7}
- )
- data_node_8 = PickleDataNode("d8", Scope.SCENARIO, "d8", parent_ids={task_5_id}, properties={"default_data": 8})
- data_node_9 = PickleDataNode(
- "d9", Scope.SCENARIO, "d9", parent_ids={task_5_id, task_6_id}, properties={"default_data": 9}
- )
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], id=task_1_id, parent_ids={sc_id})
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], id=task_2_id, parent_ids={sc_id})
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=task_3_id, parent_ids={sc_id})
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], id=task_4_id, parent_ids={sc_id})
- task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], id=task_5_id, parent_ids={sc_id})
- task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=task_6_id, parent_ids={sc_id})
- scenario = Scenario("scenario", {task_1, task_2, task_3, task_4, task_5, task_6}, {}, set(), scenario_id=sc_id)
- # d1 --- d6 ---> t2 ---> d5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # d2 --- ---> d4 ---> t4 ---> d7 ---> t6
- # |
- # d8 -------> t5 -------> d9 ------------------
- assert scenario.get_inputs() == {data_node_1, data_node_2, data_node_6, data_node_8}
- data_manager = _DataManagerFactory._build_manager()
- data_manager._delete_all()
- for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7, data_node_8, data_node_9]:
- data_manager._repository._save(dn)
- task_manager = _TaskManagerFactory._build_manager()
- for task in [task_1, task_2, task_3, task_4, task_5, task_6]:
- task_manager._repository._save(task)
- _ScenarioManagerFactory._build_manager()._repository._save(scenario)
- assert scenario.is_ready_to_run()
- data_node_1.edit_in_progress = True
- assert not scenario.is_ready_to_run()
- data_node_2.edit_in_progress = True
- assert not scenario.is_ready_to_run()
- data_node_6.edit_in_progress = True
- data_node_8.edit_in_progress = True
- assert not scenario.is_ready_to_run()
- data_node_1.edit_in_progress = False
- data_node_2.edit_in_progress = False
- data_node_6.edit_in_progress = False
- data_node_8.edit_in_progress = False
- assert scenario.is_ready_to_run()
- def test_data_nodes_being_edited():
- data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", properties={"default_data": 1})
- data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", properties={"default_data": 2})
- data_node_4 = PickleDataNode("qux", Scope.SCENARIO, "s4", properties={"default_data": 4})
- data_node_5 = PickleDataNode("quux", Scope.SCENARIO, "s5", properties={"default_data": 5})
- data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "s6", properties={"default_data": 6})
- data_node_7 = PickleDataNode("corge", Scope.SCENARIO, "s7", properties={"default_data": 7})
- data_node_8 = PickleDataNode("d8", Scope.SCENARIO, "s8", properties={"default_data": 8})
- data_node_9 = PickleDataNode("d9", Scope.SCENARIO, "s9", properties={"default_data": 9})
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], TaskId("t5"))
- task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=TaskId("t6"))
- scenario = Scenario("scenario", {task_1, task_2, task_3, task_4, task_5, task_6}, {}, set(), ScenarioId("s1"))
- # s1 --- s6 ---> t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7 ---> t6
- # |
- # s8 -------> t5 -------> s9 ------------------
- data_manager = _DataManagerFactory._build_manager()
- for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7, data_node_8, data_node_9]:
- data_manager._repository._save(dn)
- assert len(scenario.data_nodes_being_edited()) == 0
- assert scenario.data_nodes_being_edited() == set()
- data_node_1.edit_in_progress = True
- assert len(scenario.data_nodes_being_edited()) == 1
- assert scenario.data_nodes_being_edited() == {data_node_1}
- data_node_2.edit_in_progress = True
- data_node_6.edit_in_progress = True
- data_node_8.edit_in_progress = True
- assert len(scenario.data_nodes_being_edited()) == 4
- assert scenario.data_nodes_being_edited() == {data_node_1, data_node_2, data_node_6, data_node_8}
- data_node_4.edit_in_progress = True
- data_node_5.edit_in_progress = True
- data_node_9.edit_in_progress = True
- assert len(scenario.data_nodes_being_edited()) == 7
- assert scenario.data_nodes_being_edited() == {
- data_node_1,
- data_node_2,
- data_node_4,
- data_node_5,
- data_node_6,
- data_node_8,
- data_node_9,
- }
- data_node_1.edit_in_progress = False
- data_node_2.edit_in_progress = False
- data_node_6.edit_in_progress = False
- data_node_8.edit_in_progress = False
- assert len(scenario.data_nodes_being_edited()) == 3
- assert scenario.data_nodes_being_edited() == {data_node_4, data_node_5, data_node_9}
- data_node_4.edit_in_progress = False
- data_node_5.edit_in_progress = False
- data_node_7.edit_in_progress = True
- assert len(scenario.data_nodes_being_edited()) == 2
- assert scenario.data_nodes_being_edited() == {data_node_7, data_node_9}
- data_node_7.edit_in_progress = False
- data_node_9.edit_in_progress = False
- assert len(scenario.data_nodes_being_edited()) == 0
- assert scenario.data_nodes_being_edited() == set()
- def test_get_tasks():
- task_1 = Task("grault", {}, print, id=TaskId("t1"))
- task_2 = Task("garply", {}, print, id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, id=TaskId("t4"))
- scenario_1 = Scenario("scenario_1", {task_1, task_2, task_3, task_4}, {}, set(), ScenarioId("s1"))
- assert scenario_1.tasks == {"grault": task_1, "garply": task_2, "waldo": task_3, "fred": task_4}
- task_5 = Task("wallo", {}, print, id=TaskId("t5"))
- scenario_2 = Scenario("scenario_2", {task_1, task_2, task_3, task_4, task_5}, {}, set(), ScenarioId("s2"))
- assert scenario_2.tasks == {"grault": task_1, "garply": task_2, "waldo": task_3, "fred": task_4, "wallo": task_5}
- def test_get_set_of_tasks():
- task_1 = Task("grault", {}, print, id=TaskId("t1"))
- task_2 = Task("garply", {}, print, id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, id=TaskId("t4"))
- scenario_1 = Scenario("scenario_1", {task_1, task_2, task_3, task_4}, {}, set(), ScenarioId("s1"))
- assert scenario_1._get_set_of_tasks() == {task_1, task_2, task_3, task_4}
- task_5 = Task("wallo", {}, print, id=TaskId("t5"))
- scenario_2 = Scenario("scenario_2", {task_1, task_2, task_3, task_4, task_5}, {}, set(), ScenarioId("s2"))
- assert scenario_2._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5}
- def test_get_sorted_tasks():
- def _assert_equal(tasks_a, tasks_b) -> bool:
- if len(tasks_a) != len(tasks_b):
- return False
- for i in range(len(tasks_a)):
- task_a, task_b = tasks_a[i], tasks_b[i]
- if isinstance(task_a, list) and isinstance(task_b, list):
- if not _assert_equal(task_a, task_b):
- return False
- elif isinstance(task_a, list) or isinstance(task_b, list):
- return False
- else:
- index_task_b = tasks_b.index(task_a)
- if any(isinstance(task_b, list) for task_b in tasks_b[i : index_task_b + 1]):
- return False
- return True
- # s1 --- ---> s3 ---> t2 ---> s5 ----
- # | | |
- # |---> t1 ---| -------------------------> t3 ---> s6
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3")
- data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario_1 = Scenario("scenario_1", {task_1, task_2, task_3, task_4}, {}, [], ScenarioId("s1"))
- assert scenario_1.get_inputs() == {data_node_1, data_node_2}
- assert scenario_1._get_set_of_tasks() == {task_1, task_2, task_3, task_4}
- _assert_equal(scenario_1._get_sorted_tasks(), [[task_1], [task_2, task_4], [task_3]])
- # s1 --- t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3 ---> s6
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario_2 = Scenario("scenario_2", {task_1, task_2, task_3, task_4}, {}, [], ScenarioId("s2"))
- assert scenario_2.get_inputs() == {data_node_1, data_node_2}
- assert scenario_2._get_set_of_tasks() == {task_1, task_2, task_3, task_4}
- _assert_equal(scenario_2._get_sorted_tasks(), [[task_1, task_2], [task_3, task_4]])
- # s1 --- s6 ---> t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
- task_1 = Task(
- "grault",
- {},
- print,
- [data_node_1, data_node_2],
- [data_node_4],
- TaskId("t1"),
- )
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario_3 = Scenario("quest", [task_4, task_2, task_1, task_3], {}, [], scenario_id=ScenarioId("s3"))
- assert scenario_3.get_inputs() == {data_node_1, data_node_2, data_node_6}
- assert scenario_3._get_set_of_tasks() == {task_1, task_2, task_3, task_4}
- assert _assert_equal(scenario_3._get_sorted_tasks(), [[task_2, task_1], [task_4, task_3]])
- # s1 --- s6 ---> t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7 ---> t6
- # |
- # s8 -------> t5 -------> s9 ------------------
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
- data_node_8 = InMemoryDataNode("d8", Scope.SCENARIO, "s8")
- data_node_9 = InMemoryDataNode("d9", Scope.SCENARIO, "s9")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], TaskId("t5"))
- task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=TaskId("t6"))
- scenario_4 = Scenario("scenario_3", [task_1, task_2, task_3, task_4, task_5, task_6], {}, [], ScenarioId("s4"))
- assert scenario_4.get_inputs() == {data_node_1, data_node_2, data_node_6, data_node_8}
- assert scenario_4._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5, task_6}
- _assert_equal(scenario_4._get_sorted_tasks(), [[task_1, task_2, task_5], [task_3, task_4], [task_6]])
- # s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- # t2 ---> s5 |
- # s8 ---> t5 s6 --|
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
- data_node_8 = InMemoryDataNode("hugh", Scope.SCENARIO, "s8")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, output=[data_node_5], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4, data_node_6], [data_node_7], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_8], None, TaskId("t5"))
- scenario_5 = Scenario("scenario_4", [task_1, task_2, task_3, task_4, task_5], {}, [], ScenarioId("s5"))
- assert scenario_5.get_inputs() == {data_node_1, data_node_2, data_node_8, data_node_6}
- assert scenario_5._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5}
- _assert_equal(scenario_5._get_sorted_tasks(), [[task_1, task_2, task_5], [task_3, task_4]])
- # p1 s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s5
- # p2 t2 ---> s4 ---> t3
- # p3 s6 ---> t5
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- task_1 = Task(
- "grault",
- {},
- print,
- [data_node_1, data_node_2],
- [data_node_4],
- TaskId("t1"),
- )
- task_2 = Task("garply", {}, print, output=[data_node_4], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_6], None, TaskId("t5"))
- scenario_6 = Scenario("quest", [task_1, task_2, task_3, task_4, task_5], {}, [], ScenarioId("s6"))
- assert scenario_6.get_inputs() == {data_node_1, data_node_2, data_node_6}
- assert scenario_6._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5}
- _assert_equal(scenario_6._get_sorted_tasks(), [[task_5, task_2, task_1], [task_4, task_3]])
- # p1 s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s5
- # p2 t2 ---> s4 ---> t3
- # p3 s6 ---> t5 ---> s4 ---> t4 ---> s5
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
- task_1 = Task(
- "grault",
- {},
- print,
- [data_node_1, data_node_2],
- [data_node_4],
- TaskId("t1"),
- )
- task_2 = Task("garply", {}, print, output=[data_node_4], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_6], [data_node_4], None, TaskId("t5"))
- scenario_7 = Scenario("quest", [task_4, task_1, task_2, task_3, task_5], {}, [], scenario_id=ScenarioId("s7"))
- assert scenario_7.get_inputs() == {data_node_1, data_node_2, data_node_6}
- assert scenario_7._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5}
- _assert_equal(scenario_7._get_sorted_tasks(), [[task_5, task_2, task_1], [task_4, task_3]])
- # p1 s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s3 ---> t4 ---> s4
- # p2 t2 ---> s3 ---> t3
- # p3 s5 ---> t5 ---> s3 ---> t4 ---> s4
- # p4 s3 ---> t4 ---> s4
- data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
- data_node_3 = DataNode("qux", Scope.SCENARIO, "s3")
- data_node_4 = DataNode("quux", Scope.SCENARIO, "s4")
- data_node_5 = DataNode("quuz", Scope.SCENARIO, "s5")
- task_1 = Task(
- "grault",
- {},
- print,
- [data_node_1, data_node_2],
- [data_node_3],
- TaskId("t1"),
- )
- task_2 = Task("garply", {}, print, output=[data_node_3], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_3], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_3], [data_node_4], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_5], [data_node_3], TaskId("t5"))
- scenario_8 = Scenario("quest", [task_1, task_2, task_3, task_4, task_5], {}, [], scenario_id=ScenarioId("s8"))
- assert scenario_8.get_inputs() == {data_node_1, data_node_2, data_node_5}
- assert scenario_8._get_set_of_tasks() == {task_1, task_2, task_3, task_4, task_5}
- _assert_equal(scenario_8._get_sorted_tasks(), [[task_5, task_2, task_1], [task_3, task_4]])
- def test_check_consistency():
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_3 = InMemoryDataNode("bar", Scope.SCENARIO, "s3")
- data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
- data_node_8 = InMemoryDataNode("d8", Scope.SCENARIO, "s8")
- data_node_9 = InMemoryDataNode("d9", Scope.SCENARIO, "s9")
- scenario_0 = Scenario("scenario_0", [], {})
- assert scenario_0._is_consistent()
- task_1 = Task("foo", {}, print, [data_node_1], [data_node_2], TaskId("t1"))
- scenario_1 = Scenario("scenario_1", [task_1], {})
- assert scenario_1._is_consistent()
- # s1 --- ---> s3 ---> t2 ---> s5 ----
- # | | |
- # |---> t1 ---| -------------------------> t3 ---> s6
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario_2 = Scenario("scenario_2", {task_1, task_2, task_3, task_4}, {}, [], ScenarioId("s1"))
- assert scenario_2._is_consistent()
- # s1 --- t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3 ---> s6
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario_3 = Scenario("scenario_3", {task_1, task_2, task_3, task_4}, {}, [], ScenarioId("s2"))
- assert scenario_3._is_consistent()
- # s1 --- s6 ---> t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- scenario_4 = Scenario("scenario_4", [task_4, task_2, task_1, task_3], {}, [], scenario_id=ScenarioId("s3"))
- assert scenario_4._is_consistent()
- # s1 --- s6 ---> t2 ---> s5
- # | |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7 ---> t6
- # |
- # s8 -------> t5 -------> s9 ------------------
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], TaskId("t5"))
- task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=TaskId("t6"))
- scenario_5 = Scenario("scenario_5", [task_1, task_2, task_3, task_4, task_5, task_6], {}, [], ScenarioId("s4"))
- assert scenario_5._is_consistent()
- # s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s7
- # t2 ---> s5 |
- # s8 ---> t5 s6 --|
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, output=[data_node_5], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4, data_node_6], [data_node_7], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_8], None, TaskId("t5"))
- scenario_6 = Scenario("scenario_6", [task_1, task_2, task_3, task_4, task_5], {}, [], ScenarioId("s5"))
- assert scenario_6._is_consistent()
- # p1 s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s5
- # p2 t2 ---> s4 ---> t3
- # p3 s6 ---> t5
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, output=[data_node_4], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_6], None, TaskId("t5"))
- scenario_7 = Scenario("scenario_7", [task_1, task_2, task_3, task_4, task_5], {}, [], ScenarioId("s6"))
- assert scenario_7._is_consistent()
- # p1 s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s4 ---> t4 ---> s5
- # p2 t2 ---> s4 ---> t3
- # p3 s6 ---> t5 ---> s4 ---> t4 ---> s5
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
- task_2 = Task("garply", {}, print, output=[data_node_4], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_6], [data_node_4], None, TaskId("t5"))
- scenario_8 = Scenario("scenario_8", [task_4, task_1, task_2, task_3, task_5], {}, [], scenario_id=ScenarioId("s7"))
- assert scenario_8._is_consistent()
- # p1 s1 ---
- # |
- # |---> t1 ---| -----> t3
- # | | |
- # s2 --- ---> s3 ---> t4 ---> s4
- # p2 t2 ---> s3 ---> t3
- # p3 s5 ---> t5 ---> s3 ---> t4 ---> s4
- # p4 s3 ---> t4 ---> s4
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3], TaskId("t1"))
- task_2 = Task("garply", {}, print, output=[data_node_3], id=TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_3], None, id=TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_3], [data_node_4], TaskId("t4"))
- task_5 = Task("bob", {}, print, [data_node_5], [data_node_3], TaskId("t5"))
- scenario_9 = Scenario("scenario_9", [task_1, task_2, task_3, task_4, task_5], {}, [], scenario_id=ScenarioId("s8"))
- assert scenario_9._is_consistent()
- def test_check_inconsistency(caplog):
- class FakeDataNode:
- config_id = "config_id_of_a_fake_dn"
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [FakeDataNode()], TaskId("t1"))
- task_2 = Task("garply", {}, print, [data_node_1], [data_node_2], id=TaskId("t2"))
- scenario_1 = Scenario("scenario_1", [task_1, task_2], {}, [], scenario_id=ScenarioId("s1"))
- assert not scenario_1._is_consistent()
- assert (
- 'Invalid edge detected in scenario "s1": left node Task "grault" and right node FakeDataNode'
- " must connect a Task and a DataNode" in caplog.text
- )
- task_3 = Task("waldo", {}, print, [data_node_2], [data_node_1], id=TaskId("t3"))
- scenario_2 = Scenario("scenario_2", [task_2, task_3], {}, [], scenario_id=ScenarioId("s2"))
- assert not scenario_2._is_consistent()
- assert 'The DAG of scenario "s2" is not a directed acyclic graph' in caplog.text
|