123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from typing import Callable, Iterable, Optional
- from unittest import mock
- from unittest.mock import ANY
- import pytest
- from taipy import Scope
- from taipy.common.config import Config
- from taipy.core._orchestrator._orchestrator import _Orchestrator
- from taipy.core._version._version_manager import _VersionManager
- from taipy.core.common import _utils
- from taipy.core.common._utils import _Subscriber
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.data.in_memory import InMemoryDataNode
- from taipy.core.exceptions.exceptions import (
- InvalidSequenceId,
- ModelNotFound,
- NonExistingSequence,
- SequenceAlreadyExists,
- SequenceBelongsToNonExistingScenario,
- )
- from taipy.core.job._job_manager import _JobManager
- from taipy.core.scenario._scenario_manager import _ScenarioManager
- from taipy.core.scenario.scenario import Scenario
- from taipy.core.sequence._sequence_manager import _SequenceManager
- from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
- from taipy.core.sequence.sequence import Sequence
- from taipy.core.sequence.sequence_id import SequenceId
- from taipy.core.task._task_manager import _TaskManager
- from taipy.core.task.task import Task
- from taipy.core.task.task_id import TaskId
- from tests.core.utils.NotifyMock import NotifyMock
- def test_breakdown_sequence_id():
- with pytest.raises(InvalidSequenceId):
- _SequenceManager._breakdown_sequence_id("scenario_id")
- with pytest.raises(InvalidSequenceId):
- _SequenceManager._breakdown_sequence_id("sequence_id")
- with pytest.raises(InvalidSequenceId):
- _SequenceManager._breakdown_sequence_id("SEQUENCE_sequence_id")
- with pytest.raises(InvalidSequenceId):
- _SequenceManager._breakdown_sequence_id("SCENARIO_scenario_id")
- with pytest.raises(InvalidSequenceId):
- _SequenceManager._breakdown_sequence_id("sequence_SCENARIO_scenario_id")
- with pytest.raises(InvalidSequenceId):
- _SequenceManager._breakdown_sequence_id("SEQUENCE_sequence_scenario_id")
- sequence_name, scenario_id = _SequenceManager._breakdown_sequence_id("SEQUENCE_sequence_SCENARIO_scenario")
- assert sequence_name == "sequence" and scenario_id == "SCENARIO_scenario"
- sequence_name, scenario_id = _SequenceManager._breakdown_sequence_id("SEQUENCEsequenceSCENARIO_scenario")
- assert sequence_name == "sequence" and scenario_id == "SCENARIO_scenario"
- def test_raise_sequence_does_not_belong_to_scenario():
- with pytest.raises(SequenceBelongsToNonExistingScenario):
- sequence = Sequence({"name": "sequence_name"}, [], "SEQUENCE_sequence_name_SCENARIO_scenario_id")
- _SequenceManager._update(sequence)
- def __init():
- input_dn = InMemoryDataNode("foo", Scope.SCENARIO)
- output_dn = InMemoryDataNode("foo", Scope.SCENARIO)
- _DataManager._repository._save(input_dn)
- _DataManager._repository._save(output_dn)
- task = Task("task", {}, print, [input_dn], [output_dn], TaskId("Task_task_id"))
- _TaskManager._repository._save(task)
- scenario = Scenario("scenario", {task}, {}, set())
- _ScenarioManager._repository._save(scenario)
- return scenario, task
- def test_save_and_get_sequence_no_existing_sequence():
- scenario, _ = __init()
- sequence_name_1 = "p1"
- sequence_id_1 = SequenceId(f"SEQUENCE_{sequence_name_1}_{scenario.id}")
- sequence_name_2 = "p2"
- sequence_id_2 = SequenceId(f"SEQUENCE_{sequence_name_2}_{scenario.id}")
- assert _SequenceManager._get(sequence_id_1) is None
- assert _SequenceManager._get(sequence_id_2) is None
- assert _SequenceManager._get("sequence") is None
- def test_save_and_get():
- scenario, task = __init()
- sequence_name_1 = "p1"
- sequence_id_1 = SequenceId(f"SEQUENCE_{sequence_name_1}_{scenario.id}")
- sequence_name_2 = "p2"
- sequence_id_2 = SequenceId(f"SEQUENCE_{sequence_name_2}_{scenario.id}")
- scenario.add_sequences({sequence_name_1: []})
- sequence_1 = scenario.sequences[sequence_name_1]
- assert _SequenceManager._get(sequence_id_1).id == sequence_1.id
- assert len(_SequenceManager._get(sequence_id_1).tasks) == 0
- assert _SequenceManager._get(sequence_1).id == sequence_1.id
- assert len(_SequenceManager._get(sequence_1).tasks) == 0
- assert _SequenceManager._get(sequence_id_2) is None
- # Save a second sequence. Now, we expect to have a total of two sequences stored
- scenario.add_sequences({sequence_name_2: [task]})
- sequence_2 = scenario.sequences[sequence_name_2]
- assert _SequenceManager._get(sequence_id_1).id == sequence_1.id
- assert len(_SequenceManager._get(sequence_id_1).tasks) == 0
- assert _SequenceManager._get(sequence_1).id == sequence_1.id
- assert len(_SequenceManager._get(sequence_1).tasks) == 0
- assert _SequenceManager._get(sequence_id_2).id == sequence_2.id
- assert len(_SequenceManager._get(sequence_id_2).tasks) == 1
- assert _SequenceManager._get(sequence_2).id == sequence_2.id
- assert len(_SequenceManager._get(sequence_2).tasks) == 1
- assert _TaskManager._get(task.id).id == task.id
- # We save the first sequence again. We expect an exception and nothing to change
- with pytest.raises(SequenceAlreadyExists):
- scenario.add_sequence(sequence_name_1, [])
- sequence_1 = scenario.sequences[sequence_name_1]
- assert _SequenceManager._get(sequence_id_1).id == sequence_1.id
- assert len(_SequenceManager._get(sequence_id_1).tasks) == 0
- assert _SequenceManager._get(sequence_1).id == sequence_1.id
- assert len(_SequenceManager._get(sequence_1).tasks) == 0
- assert _SequenceManager._get(sequence_id_2).id == sequence_2.id
- assert len(_SequenceManager._get(sequence_id_2).tasks) == 1
- assert _SequenceManager._get(sequence_2).id == sequence_2.id
- assert len(_SequenceManager._get(sequence_2).tasks) == 1
- assert _TaskManager._get(task.id).id == task.id
- def test_task_parent_id_set_only_when_create():
- scenario, task = __init()
- sequence_name_1 = "p1"
- with mock.patch("taipy.core.task._task_manager._TaskManager._update") as mck:
- scenario.add_sequences({sequence_name_1: [task]})
- mck.assert_called_once()
- with mock.patch("taipy.core.task._task_manager._TaskManager._update") as mck:
- scenario.sequences[sequence_name_1]
- mck.assert_not_called()
- def test_get_all_on_multiple_versions_environment():
- # Create 5 sequences from Scenario with 2 versions each
- for version in range(1, 3):
- for i in range(5):
- _ScenarioManager._repository._save(
- Scenario(
- f"config_id_{i+version}",
- [],
- {},
- [],
- f"SCENARIO_id_{i}_v{version}",
- version=f"{version}.0",
- sequences={"sequence": {}},
- )
- )
- _VersionManager._set_experiment_version("1.0")
- assert len(_SequenceManager._get_all()) == 5
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "1.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 1
- )
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 0
- )
- _VersionManager._set_experiment_version("2.0")
- assert len(_SequenceManager._get_all()) == 5
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 0
- )
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v2"}])) == 1
- )
- _VersionManager._set_development_version("1.0")
- assert len(_SequenceManager._get_all()) == 5
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "1.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 1
- )
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "1.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v2"}])) == 0
- )
- _VersionManager._set_development_version("2.0")
- assert len(_SequenceManager._get_all()) == 5
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v1"}])) == 0
- )
- assert (
- len(_SequenceManager._get_all_by(filters=[{"version": "2.0", "id": "SEQUENCE_sequence_SCENARIO_id_1_v2"}])) == 1
- )
- def test_is_submittable():
- dn = InMemoryDataNode("dn", Scope.SCENARIO, properties={"default_data": 10})
- _DataManager._repository._save(dn)
- task = Task("task", {}, print, [dn])
- _TaskManager._repository._save(task)
- scenario = Scenario("scenario", {task}, {}, set())
- _ScenarioManager._repository._save(scenario)
- rc = _SequenceManager._is_submittable("some_sequence")
- assert not rc
- assert "Entity 'some_sequence' does not exist in the repository." in rc.reasons
- scenario.add_sequences({"sequence": [task]})
- sequence = scenario.sequences["sequence"]
- assert len(_SequenceManager._get_all()) == 1
- assert _SequenceManager._is_submittable(sequence)
- assert _SequenceManager._is_submittable(sequence.id)
- assert not _SequenceManager._is_submittable("Sequence_temp")
- assert not _SequenceManager._is_submittable("SEQUENCE_temp_SCENARIO_scenario")
- scenario.dn.edit_in_progress = True
- assert not _SequenceManager._is_submittable(sequence)
- assert not _SequenceManager._is_submittable(sequence.id)
- scenario.dn.edit_in_progress = False
- assert _SequenceManager._is_submittable(sequence)
- assert _SequenceManager._is_submittable(sequence.id)
- def test_submit():
- data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
- data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
- data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3")
- data_node_4 = InMemoryDataNode("qux", Scope.SCENARIO, "s4")
- data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
- data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
- data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
- _DataManager._repository._save(data_node_1)
- _DataManager._repository._save(data_node_2)
- _DataManager._repository._save(data_node_3)
- _DataManager._repository._save(data_node_4)
- _DataManager._repository._save(data_node_5)
- _DataManager._repository._save(data_node_6)
- _DataManager._repository._save(data_node_7)
- task_1 = Task(
- "grault",
- {},
- print,
- [data_node_1, data_node_2],
- [data_node_3, data_node_4],
- TaskId("t1"),
- )
- task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
- task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
- task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
- _TaskManager._repository._save(task_1)
- _TaskManager._repository._save(task_2)
- _TaskManager._repository._save(task_3)
- _TaskManager._repository._save(task_4)
- scenario = Scenario("sce", {task_1, task_2, task_3, task_4}, {})
- sequence_name = "sequence"
- sequence_id = Sequence._new_id(sequence_name, scenario.id)
- class MockOrchestrator(_Orchestrator):
- submit_calls = []
- @classmethod
- def _lock_dn_output_and_create_job(
- cls,
- task: Task,
- submit_id: str,
- submit_entity_id: str,
- callbacks: Optional[Iterable[Callable]] = None,
- force: bool = False,
- ):
- cls.submit_calls.append(task)
- return super()._lock_dn_output_and_create_job(task, submit_id, submit_entity_id, callbacks, force)
- with mock.patch("taipy.core.task._task_manager._TaskManager._orchestrator", new=MockOrchestrator):
- # sequence does not exist. We expect an exception to be raised
- with pytest.raises(NonExistingSequence):
- _SequenceManager._submit(sequence_id)
- _ScenarioManager._repository._save(scenario)
- scenario.add_sequences({sequence_name: [task_4, task_2, task_1, task_3]})
- # sequence, and tasks does exist. We expect the tasks to be submitted
- # in a specific order
- sequence = scenario.sequences[sequence_name]
- _SequenceManager._submit(sequence.id)
- calls_ids = [t.id for t in _TaskManager._orchestrator().submit_calls]
- tasks_ids = [task_1.id, task_2.id, task_4.id, task_3.id]
- assert calls_ids == tasks_ids
- _SequenceManager._submit(sequence)
- calls_ids = [t.id for t in _TaskManager._orchestrator().submit_calls]
- tasks_ids = tasks_ids * 2
- assert set(calls_ids) == set(tasks_ids)
- def test_assign_sequence_as_parent_of_task():
- dn_config_1 = Config.configure_data_node("dn_1", "in_memory", scope=Scope.SCENARIO)
- dn_config_2 = Config.configure_data_node("dn_2", "in_memory", scope=Scope.SCENARIO)
- dn_config_3 = Config.configure_data_node("dn_3", "in_memory", scope=Scope.SCENARIO)
- task_config_1 = Config.configure_task("task_1", print, [dn_config_1], [dn_config_2])
- task_config_2 = Config.configure_task("task_2", print, [dn_config_2], [dn_config_3])
- task_config_3 = Config.configure_task("task_3", print, [dn_config_2], [dn_config_3])
- tasks = _TaskManager._bulk_get_or_create([task_config_1, task_config_2, task_config_3], "scenario_id")
- sequence_1 = _SequenceManager._create("sequence_1", [tasks[0], tasks[1]], scenario_id="scenario_id")
- sequence_2 = _SequenceManager._create("sequence_2", [tasks[0], tasks[2]], scenario_id="scenario_id")
- tasks_1 = list(sequence_1.tasks.values())
- tasks_2 = list(sequence_2.tasks.values())
- assert len(tasks_1) == 2
- assert len(tasks_2) == 2
- assert tasks_1[0].parent_ids == {sequence_1.id, sequence_2.id}
- assert tasks_2[0].parent_ids == {sequence_1.id, sequence_2.id}
- assert tasks_1[1].parent_ids == {sequence_1.id}
- assert tasks_2[1].parent_ids == {sequence_2.id}
- g = 0
- def mock_function_no_input_no_output():
- global g
- g += 1
- def mock_function_one_input_no_output(inp):
- global g
- g += inp
- def mock_function_no_input_one_output():
- global g
- return g
- def test_submit_sequence_from_tasks_with_one_or_no_input_output():
- # test no input and no output Task
- task_no_input_no_output = Task("task_no_input_no_output", {}, mock_function_no_input_no_output)
- scenario_1 = Scenario("scenario_1", {task_no_input_no_output}, {})
- _TaskManager._repository._save(task_no_input_no_output)
- _ScenarioManager._repository._save(scenario_1)
- scenario_1.add_sequences({"my_sequence_1": [task_no_input_no_output]})
- sequence_1 = scenario_1.sequences["my_sequence_1"]
- assert len(sequence_1._get_sorted_tasks()) == 1
- _SequenceManager._submit(sequence_1)
- assert g == 1
- # test one input and no output Task
- data_node_input = InMemoryDataNode("input_dn", Scope.SCENARIO, properties={"default_data": 2})
- task_one_input_no_output = Task(
- "task_one_input_no_output", {}, mock_function_one_input_no_output, input=[data_node_input]
- )
- scenario_2 = Scenario("scenario_2", {task_one_input_no_output}, {})
- _DataManager._repository._save(data_node_input)
- data_node_input.unlock_edit()
- _TaskManager._repository._save(task_one_input_no_output)
- _ScenarioManager._repository._save(scenario_2)
- scenario_2.add_sequences({"my_sequence_2": [task_one_input_no_output]})
- sequence_2 = scenario_2.sequences["my_sequence_2"]
- assert len(sequence_2._get_sorted_tasks()) == 1
- _SequenceManager._submit(sequence_2)
- assert g == 3
- # test no input and one output Task
- data_node_output = InMemoryDataNode("output_dn", Scope.SCENARIO, properties={"default_data": None})
- task_no_input_one_output = Task(
- "task_no_input_one_output", {}, mock_function_no_input_one_output, output=[data_node_output]
- )
- scenario_3 = Scenario("scenario_3", {task_no_input_one_output}, {})
- _DataManager._repository._save(data_node_output)
- assert data_node_output.read() is None
- _TaskManager._repository._save(task_no_input_one_output)
- _ScenarioManager._repository._save(scenario_3)
- scenario_3.add_sequences({"my_sequence_3": [task_no_input_one_output]})
- sequence_3 = scenario_3.sequences["my_sequence_3"]
- assert len(sequence_2._get_sorted_tasks()) == 1
- _SequenceManager._submit(sequence_3)
- assert data_node_output.read() == 3
- def mult_by_two(nb: int):
- return nb * 2
- def mult_by_3(nb: int):
- return nb * 3
- def test_get_or_create_data():
- # only create intermediate data node once
- dn_config_1 = Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)
- dn_config_2 = Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0)
- dn_config_6 = Config.configure_data_node("baz", "in_memory", Scope.SCENARIO, default_data=0)
- task_config_mult_by_two = Config.configure_task("mult_by_two", mult_by_two, [dn_config_1], dn_config_2)
- task_config_mult_by_3 = Config.configure_task("mult_by_3", mult_by_3, [dn_config_2], dn_config_6)
- # dn_1 ---> mult_by_two ---> dn_2 ---> mult_by_3 ---> dn_6
- scenario_config = Config.configure_scenario("scenario", [task_config_mult_by_two, task_config_mult_by_3])
- assert len(_DataManager._get_all()) == 0
- assert len(_TaskManager._get_all()) == 0
- scenario = _ScenarioManager._create(scenario_config)
- scenario.add_sequences({"by_6": list(scenario.tasks.values())})
- sequence = scenario.sequences["by_6"]
- assert sequence.properties["name"] == "by_6"
- assert len(_DataManager._get_all()) == 3
- assert len(_TaskManager._get_all()) == 2
- assert len(sequence._get_sorted_tasks()) == 2
- assert sequence.foo.read() == 1
- assert sequence.bar.read() == 0
- assert sequence.baz.read() == 0
- assert sequence._get_sorted_tasks()[0][0].config_id == task_config_mult_by_two.id
- assert sequence._get_sorted_tasks()[1][0].config_id == task_config_mult_by_3.id
- _SequenceManager._submit(sequence.id)
- assert sequence.foo.read() == 1
- assert sequence.bar.read() == 2
- assert sequence.baz.read() == 6
- sequence.foo.write("new data value")
- assert sequence.foo.read() == "new data value"
- assert sequence.bar.read() == 2
- assert sequence.baz.read() == 6
- sequence.bar.write(7)
- assert sequence.foo.read() == "new data value"
- assert sequence.bar.read() == 7
- assert sequence.baz.read() == 6
- with pytest.raises(AttributeError):
- sequence.WRONG.write(7)
- def notify1(*args, **kwargs): ...
- def notify2(*args, **kwargs): ...
- def notify_multi_param(*args, **kwargs): ...
- def test_sequence_notification_subscribe(mocker):
- mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
- task_configs = [
- Config.configure_task(
- "mult_by_two",
- mult_by_two,
- [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
- Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
- )
- ]
- tasks = _TaskManager._bulk_get_or_create(task_configs=task_configs)
- scenario = Scenario("scenario", set(tasks), {}, sequences={"by_1": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["by_1"]
- notify_1 = NotifyMock(sequence)
- notify_1.__name__ = "notify_1"
- notify_1.__module__ = "notify_1"
- notify_2 = NotifyMock(sequence)
- notify_2.__name__ = "notify_2"
- notify_2.__module__ = "notify_2"
- # Mocking this because NotifyMock is a class that does not loads correctly when getting the sequence
- # from the storage.
- mocker.patch.object(
- _utils,
- "_load_fct",
- side_effect=[notify_1, notify_1, notify_2, notify_2, notify_2, notify_2],
- )
- # test subscription
- callback = mock.MagicMock()
- _SequenceManager._submit(sequence.id, [callback])
- callback.assert_called()
- # test sequence subscribe notification
- _SequenceManager._subscribe(callback=notify_1, sequence=sequence)
- _SequenceManager._submit(sequence.id)
- notify_1.assert_called_3_times()
- notify_1.reset()
- # test sequence unsubscribe notification
- # test subscribe notification only on new job
- _SequenceManager._unsubscribe(callback=notify_1, sequence=sequence)
- _SequenceManager._subscribe(callback=notify_2, sequence=sequence)
- _SequenceManager._submit(sequence)
- notify_1.assert_not_called()
- notify_2.assert_called_3_times()
- def test_sequence_notification_subscribe_multi_param(mocker):
- mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
- task_configs = [
- Config.configure_task(
- "mult_by_two",
- mult_by_two,
- [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
- Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
- )
- ]
- tasks = _TaskManager._bulk_get_or_create(task_configs)
- scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["by_6"]
- notify = mocker.Mock()
- # test sequence subscribe notification
- _SequenceManager._subscribe(callback=notify, params=["foobar", 123, 1.2], sequence=sequence)
- mocker.patch.object(_SequenceManager, "_get", return_value=sequence)
- _SequenceManager._submit(sequence.id)
- # as the callback is called with Sequence/Scenario and Job objects
- # we can assert that is called with params plus a sequence object that we know
- # of and a job object that is represented by ANY in this case
- notify.assert_called_with("foobar", 123, 1.2, sequence, ANY)
- def test_sequence_notification_unsubscribe(mocker):
- mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
- task_configs = [
- Config.configure_task(
- "mult_by_two",
- mult_by_two,
- [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
- Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
- )
- ]
- tasks = _TaskManager._bulk_get_or_create(task_configs)
- scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["by_6"]
- notify_1 = notify1
- notify_2 = notify2
- _SequenceManager._subscribe(callback=notify_1, sequence=sequence)
- _SequenceManager._unsubscribe(callback=notify_1, sequence=sequence)
- _SequenceManager._subscribe(callback=notify_2, sequence=sequence)
- _SequenceManager._submit(sequence.id)
- with pytest.raises(ValueError):
- _SequenceManager._unsubscribe(callback=notify_1, sequence=sequence)
- _SequenceManager._unsubscribe(callback=notify_2, sequence=sequence)
- def test_sequence_notification_unsubscribe_multi_param():
- task_configs = [
- Config.configure_task(
- "mult_by_two",
- mult_by_two,
- [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
- Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
- )
- ]
- tasks = _TaskManager._bulk_get_or_create(task_configs)
- scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["by_6"]
- _SequenceManager._subscribe(callback=notify_multi_param, params=["foobar", 123, 0], sequence=sequence)
- _SequenceManager._subscribe(callback=notify_multi_param, params=["foobar", 123, 1], sequence=sequence)
- _SequenceManager._subscribe(callback=notify_multi_param, params=["foobar", 123, 2], sequence=sequence)
- assert len(sequence.subscribers) == 3
- sequence.unsubscribe(notify_multi_param)
- assert len(sequence.subscribers) == 2
- assert _Subscriber(notify_multi_param, ["foobar", 123, 0]) not in sequence.subscribers
- sequence.unsubscribe(notify_multi_param, ["foobar", 123, 2])
- assert len(sequence.subscribers) == 1
- assert _Subscriber(notify_multi_param, ["foobar", 123, 2]) not in sequence.subscribers
- with pytest.raises(ValueError):
- sequence.unsubscribe(notify_multi_param, ["foobar", 123, 10000])
- def test_sequence_notification_subscribe_all():
- task_configs = [
- Config.configure_task(
- "mult_by_two",
- mult_by_two,
- [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
- Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
- )
- ]
- tasks = _TaskManager._bulk_get_or_create(task_configs)
- scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}, "other_sequence": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["by_6"]
- other_sequence = scenario.sequences["other_sequence"]
- notify_1 = NotifyMock(sequence)
- _SequenceManager._subscribe(notify_1)
- assert len(_SequenceManager._get(sequence.id).subscribers) == 1
- assert len(_SequenceManager._get(other_sequence.id).subscribers) == 1
- def test_delete():
- sequence_id = "SEQUENCE_sequence_SCENARIO_scenario_id_1"
- with pytest.raises(ModelNotFound):
- _SequenceManager._delete(sequence_id)
- scenario_1 = Scenario("scenario_1", set(), {}, scenario_id="SCENARIO_scenario_id_1")
- scenario_2 = Scenario("scenario_2", set(), {}, scenario_id="SCENARIO_scenario_id_2")
- _ScenarioManager._repository._save(scenario_1)
- _ScenarioManager._repository._save(scenario_2)
- with pytest.raises(ModelNotFound):
- _SequenceManager._delete(SequenceId(sequence_id))
- scenario_1.add_sequences({"sequence": []})
- assert len(_SequenceManager._get_all()) == 1
- _SequenceManager._delete(SequenceId(sequence_id))
- assert len(_SequenceManager._get_all()) == 0
- scenario_1.add_sequences({"sequence": [], "sequence_1": []})
- assert len(_SequenceManager._get_all()) == 2
- _SequenceManager._delete(SequenceId(sequence_id))
- assert len(_SequenceManager._get_all()) == 1
- with pytest.raises(SequenceAlreadyExists):
- scenario_1.add_sequences({"sequence_1": [], "sequence_2": [], "sequence_3": []})
- scenario_1.add_sequences({"sequence_2": [], "sequence_3": []})
- scenario_2.add_sequences({"sequence_1_2": [], "sequence_2_2": []})
- assert len(_SequenceManager._get_all()) == 5
- _SequenceManager._delete_all()
- assert len(_SequenceManager._get_all()) == 0
- scenario_1.add_sequences({"sequence_1": [], "sequence_2": [], "sequence_3": [], "sequence_4": []})
- scenario_2.add_sequences({"sequence_1_2": [], "sequence_2_2": []})
- assert len(_SequenceManager._get_all()) == 6
- _SequenceManager._delete_many(
- [
- "SEQUENCE_sequence_1_SCENARIO_scenario_id_1",
- "SEQUENCE_sequence_2_SCENARIO_scenario_id_1",
- "SEQUENCE_sequence_1_2_SCENARIO_scenario_id_2",
- ]
- )
- assert len(_SequenceManager._get_all()) == 3
- with pytest.raises(ModelNotFound):
- _SequenceManager._delete_many(
- ["SEQUENCE_sequence_1_SCENARIO_scenario_id_1", "SEQUENCE_sequence_2_SCENARIO_scenario_id_1"]
- )
- def test_delete_version():
- scenario_1_0 = Scenario(
- "scenario_config",
- [],
- {},
- scenario_id="SCENARIO_id_1_v1_0",
- version="1.0",
- sequences={"sequence_1": {}, "sequence_2": {}},
- )
- scenario_1_1 = Scenario(
- "scenario_config",
- [],
- {},
- scenario_id="SCENARIO_id_1_v1_1",
- version="1.1",
- sequences={"sequence_1": {}, "sequence_2": {}},
- )
- _ScenarioManager._repository._save(scenario_1_0)
- _ScenarioManager._repository._save(scenario_1_1)
- _VersionManager._set_experiment_version("1.1")
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 2
- _VersionManager._set_experiment_version("1.0")
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 2
- _SequenceManager._delete_by_version("1.0")
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 0
- assert len(scenario_1_0.sequences) == 0
- assert len(scenario_1_1.sequences) == 2
- _VersionManager._set_experiment_version("1.1")
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 2
- assert len(scenario_1_0.sequences) == 0
- assert len(scenario_1_1.sequences) == 2
- _SequenceManager._delete_by_version("1.1")
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 0
- def test_exists():
- scenario = Scenario("scenario", [], {}, scenario_id="SCENARIO_scenario", sequences={"sequence": {}})
- _ScenarioManager._repository._save(scenario)
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 1
- assert not _SequenceManager._exists("SEQUENCE_sequence_not_exist_SCENARIO_scenario")
- assert not _SequenceManager._exists("SEQUENCE_sequence_SCENARIO_scenario_id")
- assert _SequenceManager._exists("SEQUENCE_sequence_SCENARIO_scenario")
- assert _SequenceManager._exists(scenario.sequences["sequence"])
- def test_hard_delete_one_single_sequence_with_scenario_data_nodes():
- dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
- dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
- task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
- tasks = _TaskManager._bulk_get_or_create([task_config])
- scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["sequence"]
- sequence.submit()
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 1
- assert len(_DataManager._get_all()) == 2
- assert len(_JobManager._get_all()) == 1
- _SequenceManager._hard_delete(sequence.id)
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 0
- assert len(_TaskManager._get_all()) == 1
- assert len(_DataManager._get_all()) == 2
- assert len(_JobManager._get_all()) == 1
- def test_hard_delete_one_single_sequence_with_cycle_data_nodes():
- dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing")
- dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE)
- task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
- tasks = _TaskManager._bulk_get_or_create([task_config])
- scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["sequence"]
- sequence.submit()
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 1
- assert len(_DataManager._get_all()) == 2
- assert len(_JobManager._get_all()) == 1
- _SequenceManager._hard_delete(sequence.id)
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 0
- assert len(_TaskManager._get_all()) == 1
- assert len(_DataManager._get_all()) == 2
- assert len(_JobManager._get_all()) == 1
- def test_hard_delete_shared_entities():
- input_dn = Config.configure_data_node("my_input", "in_memory", default_data="testing")
- intermediate_dn = Config.configure_data_node("my_inter", "in_memory")
- output_dn = Config.configure_data_node("my_output", "in_memory")
- task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn)
- task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn)
- scenario_config = Config.configure_scenario("sc", [task_1, task_2])
- import taipy as tp
- scenario_1 = tp.create_scenario(scenario_config, name="scenario_1")
- scenario_1.add_sequence("sequence", [scenario_1.task_1, scenario_1.task_2])
- scenario_2 = tp.create_scenario(scenario_config, name="scenario_2")
- scenario_2.add_sequence("sequence", [scenario_2.task_1, scenario_2.task_2])
- sequence_1 = scenario_1.sequences["sequence"]
- sequence_2 = scenario_2.sequences["sequence"]
- _SequenceManager._submit(sequence_1.id)
- _SequenceManager._submit(sequence_2.id)
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_SequenceManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 4
- assert len(_DataManager._get_all()) == 6
- assert len(_JobManager._get_all()) == 4
- _SequenceManager._hard_delete(sequence_1.id)
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_SequenceManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 4
- assert len(_DataManager._get_all()) == 6
- assert len(_JobManager._get_all()) == 4
- def my_print(a, b):
- print(a + b) # noqa: T201
- def test_submit_task_with_input_dn_wrong_file_path(caplog):
- csv_dn_cfg = Config.configure_csv_data_node("wrong_csv_file_path", default_path="wrong_path.csv")
- pickle_dn_cfg = Config.configure_pickle_data_node("wrong_pickle_file_path", default_path="wrong_path.pickle")
- parquet_dn_cfg = Config.configure_parquet_data_node("wrong_parquet_file_path", default_path="wrong_path.parquet")
- json_dn_cfg = Config.configure_parquet_data_node("wrong_json_file_path", default_path="wrong_path.json")
- task_cfg = Config.configure_task("task", my_print, [csv_dn_cfg, pickle_dn_cfg], parquet_dn_cfg)
- task_2_cfg = Config.configure_task("task2", my_print, [csv_dn_cfg, parquet_dn_cfg], json_dn_cfg)
- tasks = _TaskManager._bulk_get_or_create([task_cfg, task_2_cfg])
- scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["sequence"]
- pip_manager = _SequenceManagerFactory._build_manager()
- pip_manager._submit(sequence)
- stdout = caplog.text
- expected_outputs = [
- f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
- f"path : {input_dn.path} "
- for input_dn in sequence.get_inputs()
- ]
- not_expected_outputs = [
- f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
- f"path : {input_dn.path} "
- for input_dn in sequence.data_nodes.values()
- if input_dn not in sequence.get_inputs()
- ]
- assert all(expected_output in stdout for expected_output in expected_outputs)
- assert all(expected_output not in stdout for expected_output in not_expected_outputs)
- def test_submit_task_with_one_input_dn_wrong_file_path(caplog):
- csv_dn_cfg = Config.configure_csv_data_node("wrong_csv_file_path", default_path="wrong_path.csv")
- pickle_dn_cfg = Config.configure_pickle_data_node("wrong_pickle_file_path", default_data="value")
- parquet_dn_cfg = Config.configure_parquet_data_node("wrong_parquet_file_path", default_path="wrong_path.parquet")
- json_dn_cfg = Config.configure_parquet_data_node("wrong_json_file_path", default_path="wrong_path.json")
- task_cfg = Config.configure_task("task", my_print, [csv_dn_cfg, pickle_dn_cfg], parquet_dn_cfg)
- task_2_cfg = Config.configure_task("task2", my_print, [csv_dn_cfg, parquet_dn_cfg], json_dn_cfg)
- tasks = _TaskManager._bulk_get_or_create([task_cfg, task_2_cfg])
- scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
- _ScenarioManager._repository._save(scenario)
- sequence = scenario.sequences["sequence"]
- pip_manager = _SequenceManagerFactory._build_manager()
- pip_manager._submit(sequence)
- stdout = caplog.text
- expected_outputs = [
- f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
- f"path : {input_dn.path} "
- for input_dn in sequence.get_inputs()
- if input_dn.config_id == "wrong_csv_file_path"
- ]
- not_expected_outputs = [
- f"{input_dn.id} cannot be read because it has never been written. Hint: The data node may refer to a wrong "
- f"path : {input_dn.path} "
- for input_dn in sequence.data_nodes.values()
- if input_dn.config_id != "wrong_csv_file_path"
- ]
- assert all(expected_output in stdout for expected_output in expected_outputs)
- assert all(expected_output not in stdout for expected_output in not_expected_outputs)
|