123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- from datetime import datetime, timedelta
- from time import sleep
- from unittest import mock
- import freezegun
- import pandas as pd
- import pytest
- import taipy.core as tp
- from taipy import Scope
- from taipy.common.config import Config
- from taipy.common.config.exceptions.exceptions import InvalidConfigurationId
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.data._data_manager_factory import _DataManagerFactory
- from taipy.core.data.data_node import DataNode
- from taipy.core.data.data_node_id import (
- EDIT_COMMENT_KEY,
- EDIT_EDITOR_ID_KEY,
- EDIT_JOB_ID_KEY,
- EDIT_TIMESTAMP_KEY,
- DataNodeId,
- )
- from taipy.core.data.in_memory import InMemoryDataNode
- from taipy.core.exceptions.exceptions import DataNodeIsBeingEdited, NoData
- from taipy.core.job.job_id import JobId
- from taipy.core.task.task import Task
- from .utils import FakeDataNode
- def funct_a_b(input: str):
- print("task_a_b") # noqa: T201
- return "B"
- def funct_b_c(input: str):
- print("task_b_c") # noqa: T201
- return "C"
- def funct_b_d(input: str):
- print("task_b_d") # noqa: T201
- return "D"
- class TestDataNode:
- def test_dn_equals(self, data_node):
- data_manager = _DataManagerFactory()._build_manager()
- dn_id = data_node.id
- data_manager._repository._save(data_node)
- # # To test if instance is same type
- task = Task("task", {}, print, [], [], dn_id)
- dn_2 = data_manager._get(dn_id)
- assert data_node == dn_2
- assert data_node != dn_id
- assert data_node != task
- def test_create_with_default_values(self):
- dn = DataNode("foo_bar")
- assert dn.config_id == "foo_bar"
- assert dn.scope == Scope.SCENARIO
- assert dn.id is not None
- assert dn.name is None
- assert dn.owner_id is None
- assert dn.parent_ids == set()
- assert dn.last_edit_date is None
- assert dn.job_ids == []
- assert not dn.is_ready_for_reading
- assert len(dn.properties) == 0
- def test_create_with_ranks(self):
- # Test _rank is propagated from the config
- cfg = Config.configure_data_node("foo_bar")
- cfg._ranks = {"A": 1, "B": 2, "C": 0}
- dn = DataNode("foo_bar")
- assert dn.config_id == "foo_bar"
- assert dn.scope == Scope.SCENARIO
- assert dn.id is not None
- assert dn.name is None
- assert dn.owner_id is None
- assert dn.parent_ids == set()
- assert dn.last_edit_date is None
- assert dn.job_ids == []
- assert not dn.is_ready_for_reading
- assert len(dn.properties) == 0
- assert dn._get_rank("A") == 1
- assert dn._get_rank("B") == 2
- assert dn._get_rank("C") == 0
- def test_is_up_to_date_when_not_written(self):
- dn_confg_1 = Config.configure_in_memory_data_node("dn_1", default_data="a")
- dn_confg_2 = Config.configure_in_memory_data_node("dn_2")
- task_config_1 = Config.configure_task("t1", funct_a_b, [dn_confg_1], [dn_confg_2])
- scenario_config = Config.configure_scenario("sc", [task_config_1])
- scenario = tp.create_scenario(scenario_config)
- assert scenario.dn_1.is_up_to_date is True
- assert scenario.dn_2.is_up_to_date is False
- tp.submit(scenario)
- assert scenario.dn_1.is_up_to_date is True
- assert scenario.dn_2.is_up_to_date is True
- def test_create(self):
- a_date = datetime.now()
- dn = DataNode(
- "foo_bar",
- Scope.SCENARIO,
- DataNodeId("an_id"),
- "a_scenario_id",
- {"a_parent_id"},
- a_date,
- [{"job_id": "a_job_id"}],
- edit_in_progress=False,
- prop="erty",
- name="a name",
- )
- assert dn.config_id == "foo_bar"
- assert dn.scope == Scope.SCENARIO
- assert dn.id == "an_id"
- assert dn.name == "a name"
- assert dn.owner_id == "a_scenario_id"
- assert dn.parent_ids == {"a_parent_id"}
- assert dn.last_edit_date == a_date
- assert dn.job_ids == ["a_job_id"]
- assert dn.is_ready_for_reading
- assert len(dn.properties) == 2
- assert dn.properties == {"prop": "erty", "name": "a name"}
- with pytest.raises(InvalidConfigurationId):
- DataNode("foo bar")
- def test_read_write(self):
- dn = FakeDataNode("foo_bar")
- _DataManagerFactory._build_manager()._repository._save(dn)
- assert dn.read() is None
- with pytest.raises(NoData):
- _DataManagerFactory._build_manager()._read(dn)
- with pytest.raises(NoData):
- dn.read_or_raise()
- assert dn.write_has_been_called == 0
- assert dn.read_has_been_called == 0
- assert not dn.is_ready_for_reading
- assert dn.last_edit_date is None
- assert dn.job_ids == []
- assert dn.edits == []
- dn.write("Any data")
- assert dn.write_has_been_called == 1
- assert dn.read_has_been_called == 0
- assert dn.last_edit_date is not None
- first_edition = dn.last_edit_date
- assert dn.is_ready_for_reading
- assert dn.job_ids == []
- assert len(dn.edits) == 1
- assert dn.get_last_edit()["timestamp"] == dn.last_edit_date
- sleep(0.1)
- dn.write("Any other data", job_id := JobId("a_job_id"))
- assert dn.write_has_been_called == 2
- assert dn.read_has_been_called == 0
- second_edition = dn.last_edit_date
- assert first_edition < second_edition
- assert dn.is_ready_for_reading
- assert dn.job_ids == [job_id]
- assert len(dn.edits) == 2
- assert dn.get_last_edit()["timestamp"] == dn.last_edit_date
- dn.read()
- assert dn.write_has_been_called == 2
- assert dn.read_has_been_called == 1
- second_edition = dn.last_edit_date
- assert first_edition < second_edition
- assert dn.is_ready_for_reading
- assert dn.job_ids == [job_id]
- def test_lock_initialization(self):
- dn = InMemoryDataNode("dn", Scope.SCENARIO)
- assert not dn.edit_in_progress
- assert dn._editor_id is None
- assert dn._editor_expiration_date is None
- def test_locked_dn_unlockable_only_by_same_editor(self):
- dn = InMemoryDataNode("dn", Scope.SCENARIO)
- _DataManagerFactory._build_manager()._repository._save(dn)
- dn.lock_edit("user_1")
- assert dn.edit_in_progress
- assert dn._editor_id == "user_1"
- assert dn._editor_expiration_date is not None
- with pytest.raises(DataNodeIsBeingEdited):
- dn.lock_edit("user_2")
- with pytest.raises(DataNodeIsBeingEdited):
- dn.unlock_edit("user_2")
- dn.unlock_edit("user_1")
- assert not dn.edit_in_progress
- assert dn._editor_id is None
- assert dn._editor_expiration_date is None
- def test_none_editor_can_lock_a_locked_dn(self):
- dn = InMemoryDataNode("dn", Scope.SCENARIO)
- _DataManagerFactory._build_manager()._repository._save(dn)
- dn.lock_edit("user")
- assert dn.edit_in_progress
- assert dn._editor_id == "user"
- assert dn._editor_expiration_date is not None
- dn.lock_edit()
- assert dn.edit_in_progress
- assert dn._editor_id is None
- assert dn._editor_expiration_date is None
- def test_none_editor_can_unlock_a_locked_dn(self):
- dn = InMemoryDataNode("dn", Scope.SCENARIO)
- _DataManagerFactory._build_manager()._repository._save(dn)
- dn.lock_edit("user")
- assert dn.edit_in_progress
- assert dn._editor_id == "user"
- assert dn._editor_expiration_date is not None
- dn.unlock_edit()
- assert not dn.edit_in_progress
- assert dn._editor_id is None
- assert dn._editor_expiration_date is None
- dn.lock_edit()
- assert dn.edit_in_progress
- assert dn._editor_id is None
- assert dn._editor_expiration_date is None
- dn.unlock_edit()
- assert not dn.edit_in_progress
- assert dn._editor_id is None
- assert dn._editor_expiration_date is None
- def test_ready_for_reading(self):
- dn = InMemoryDataNode("foo_bar", Scope.CYCLE)
- _DataManagerFactory._build_manager()._repository._save(dn)
- assert dn.last_edit_date is None
- assert not dn.is_ready_for_reading
- assert dn.job_ids == []
- dn.lock_edit()
- assert dn.last_edit_date is None
- assert not dn.is_ready_for_reading
- assert dn.job_ids == []
- dn.unlock_edit()
- assert dn.last_edit_date is None
- assert not dn.is_ready_for_reading
- assert dn.job_ids == []
- dn.lock_edit()
- assert dn.last_edit_date is None
- assert not dn.is_ready_for_reading
- assert dn.job_ids == []
- dn.write("toto", job_id := JobId("a_job_id"))
- assert dn.last_edit_date is not None
- assert dn.is_ready_for_reading
- assert dn.job_ids == [job_id]
- def test_is_valid_no_validity_period(self):
- dn = InMemoryDataNode("foo", Scope.SCENARIO, DataNodeId("id"), "name", "owner_id")
- _DataManagerFactory._build_manager()._repository._save(dn)
- # Test Never been written
- assert not dn.is_valid
- # test has been written
- dn.write("My data")
- assert dn.is_valid
- def test_is_valid_with_30_min_validity_period(self):
- dn = InMemoryDataNode(
- "foo", Scope.SCENARIO, DataNodeId("id"), "name", "owner_id", validity_period=timedelta(minutes=30)
- )
- _DataManagerFactory._build_manager()._repository._save(dn)
- # Test Never been written
- assert dn.is_valid is False
- # Has been written less than 30 minutes ago
- dn.write("My data")
- assert dn.is_valid is True
- # Has been written more than 30 minutes ago
- dn.last_edit_date = datetime.now() + timedelta(days=-1)
- assert dn.is_valid is False
- def test_is_valid_with_5_days_validity_period(self):
- dn = InMemoryDataNode("foo", Scope.SCENARIO, validity_period=timedelta(days=5))
- _DataManagerFactory._build_manager()._repository._save(dn)
- # Test Never been written
- assert dn.is_valid is False
- # Has been written less than 30 minutes ago
- dn.write("My data")
- assert dn.is_valid is True
- # Has been written more than 30 minutes ago
- dn._last_edit_date = datetime.now() - timedelta(days=6)
- _DataManager()._repository._save(dn)
- assert dn.is_valid is False
- def test_is_up_to_date(self, current_datetime):
- dn_confg_1 = Config.configure_in_memory_data_node("dn_1")
- dn_confg_2 = Config.configure_in_memory_data_node("dn_2")
- dn_confg_3 = Config.configure_in_memory_data_node("dn_3", scope=Scope.GLOBAL)
- task_config_1 = Config.configure_task("t1", print, [dn_confg_1], [dn_confg_2])
- task_config_2 = Config.configure_task("t2", print, [dn_confg_2], [dn_confg_3])
- scenario_config = Config.configure_scenario("sc", [task_config_1, task_config_2])
- scenario_1 = tp.create_scenario(scenario_config)
- assert len(_DataManager._get_all()) == 3
- dn_1_1 = scenario_1.data_nodes["dn_1"]
- dn_2_1 = scenario_1.data_nodes["dn_2"]
- dn_3_1 = scenario_1.data_nodes["dn_3"]
- assert dn_1_1.last_edit_date is None
- assert dn_2_1.last_edit_date is None
- assert dn_3_1.last_edit_date is None
- dn_1_1.last_edit_date = current_datetime + timedelta(1)
- dn_2_1.last_edit_date = current_datetime + timedelta(2)
- dn_3_1.last_edit_date = current_datetime + timedelta(3)
- assert dn_1_1.is_up_to_date
- assert dn_2_1.is_up_to_date
- assert dn_3_1.is_up_to_date
- dn_2_1.last_edit_date = current_datetime + timedelta(4)
- assert dn_1_1.is_up_to_date
- assert dn_2_1.is_up_to_date
- assert not dn_3_1.is_up_to_date
- dn_1_1.last_edit_date = current_datetime + timedelta(5)
- assert dn_1_1.is_up_to_date
- assert not dn_2_1.is_up_to_date
- assert not dn_3_1.is_up_to_date
- dn_1_1.last_edit_date = current_datetime + timedelta(1)
- dn_2_1.last_edit_date = current_datetime + timedelta(2)
- dn_3_1.last_edit_date = current_datetime + timedelta(3)
- def test_is_up_to_date_across_scenarios(self, current_datetime):
- dn_confg_1 = Config.configure_in_memory_data_node("dn_1", scope=Scope.SCENARIO)
- dn_confg_2 = Config.configure_in_memory_data_node("dn_2", scope=Scope.SCENARIO)
- dn_confg_3 = Config.configure_in_memory_data_node("dn_3", scope=Scope.GLOBAL)
- task_config_1 = Config.configure_task("t1", print, [dn_confg_1], [dn_confg_2])
- task_config_2 = Config.configure_task("t2", print, [dn_confg_2], [dn_confg_3])
- scenario_config = Config.configure_scenario("sc", [task_config_1, task_config_2])
- scenario_1 = tp.create_scenario(scenario_config)
- scenario_2 = tp.create_scenario(scenario_config)
- assert len(_DataManager._get_all()) == 5
- dn_1_1 = scenario_1.data_nodes["dn_1"]
- dn_2_1 = scenario_1.data_nodes["dn_2"]
- dn_1_2 = scenario_2.data_nodes["dn_1"]
- dn_2_2 = scenario_2.data_nodes["dn_2"]
- dn_3 = scenario_1.data_nodes["dn_3"]
- assert dn_3 == scenario_2.data_nodes["dn_3"]
- assert dn_1_1.last_edit_date is None
- assert dn_2_1.last_edit_date is None
- assert dn_1_2.last_edit_date is None
- assert dn_2_2.last_edit_date is None
- assert dn_3.last_edit_date is None
- dn_1_1.last_edit_date = current_datetime + timedelta(1)
- dn_2_1.last_edit_date = current_datetime + timedelta(2)
- dn_1_2.last_edit_date = current_datetime + timedelta(3)
- dn_2_2.last_edit_date = current_datetime + timedelta(4)
- dn_3.last_edit_date = current_datetime + timedelta(5)
- assert dn_1_1.is_up_to_date
- assert dn_2_1.is_up_to_date
- assert dn_1_2.is_up_to_date
- assert dn_2_2.is_up_to_date
- assert dn_3.is_up_to_date
- dn_2_1.last_edit_date = current_datetime + timedelta(6)
- assert dn_1_1.is_up_to_date
- assert dn_2_1.is_up_to_date
- assert dn_1_2.is_up_to_date
- assert dn_2_2.is_up_to_date
- assert not dn_3.is_up_to_date
- dn_2_1.last_edit_date = current_datetime + timedelta(2)
- dn_2_2.last_edit_date = current_datetime + timedelta(6)
- assert dn_1_1.is_up_to_date
- assert dn_2_1.is_up_to_date
- assert dn_1_2.is_up_to_date
- assert dn_2_2.is_up_to_date
- assert not dn_3.is_up_to_date
- dn_2_2.last_edit_date = current_datetime + timedelta(4)
- dn_1_1.last_edit_date = current_datetime + timedelta(6)
- assert dn_1_1.is_up_to_date
- assert not dn_2_1.is_up_to_date
- assert dn_1_2.is_up_to_date
- assert dn_2_2.is_up_to_date
- assert not dn_3.is_up_to_date
- dn_1_2.last_edit_date = current_datetime + timedelta(6)
- assert dn_1_1.is_up_to_date
- assert not dn_2_1.is_up_to_date
- assert dn_1_2.is_up_to_date
- assert not dn_2_2.is_up_to_date
- assert not dn_3.is_up_to_date
- def test_do_not_recompute_data_node_valid_but_continue_sequence_execution(self):
- a = Config.configure_data_node("A", "pickle", default_data="A")
- b = Config.configure_data_node("B", "pickle")
- c = Config.configure_data_node("C", "pickle")
- d = Config.configure_data_node("D", "pickle")
- task_a_b = Config.configure_task("task_a_b", funct_a_b, input=a, output=b, skippable=True)
- task_b_c = Config.configure_task("task_b_c", funct_b_c, input=b, output=c)
- task_b_d = Config.configure_task("task_b_d", funct_b_d, input=b, output=d)
- scenario_cfg = Config.configure_scenario("scenario", [task_a_b, task_b_c, task_b_d])
- scenario = tp.create_scenario(scenario_cfg)
- scenario.submit()
- assert scenario.A.read() == "A"
- assert scenario.B.read() == "B"
- assert scenario.C.read() == "C"
- assert scenario.D.read() == "D"
- scenario.submit()
- assert len(tp.get_jobs()) == 6
- jobs_and_status = [(job.task.config_id, job.status) for job in tp.get_jobs()]
- assert ("task_a_b", tp.Status.COMPLETED) in jobs_and_status
- assert ("task_a_b", tp.Status.SKIPPED) in jobs_and_status
- assert ("task_b_c", tp.Status.COMPLETED) in jobs_and_status
- assert ("task_b_d", tp.Status.COMPLETED) in jobs_and_status
- def test_data_node_update_after_writing(self):
- dn = FakeDataNode("foo")
- _DataManager._repository._save(dn)
- assert not _DataManager._get(dn.id).is_ready_for_reading
- dn.write("Any data")
- assert dn.is_ready_for_reading
- assert _DataManager._get(dn.id).is_ready_for_reading
- def test_expiration_date_raise_if_never_write(self):
- dn = FakeDataNode("foo")
- with pytest.raises(NoData):
- _ = dn.expiration_date
- def test_validity_null_if_never_write(self):
- dn = FakeDataNode("foo")
- assert dn.validity_period is None
- def test_auto_update_and_reload(self, current_datetime):
- dn_1 = InMemoryDataNode(
- "foo",
- scope=Scope.GLOBAL,
- id=DataNodeId("an_id"),
- owner_id=None,
- parent_ids=None,
- last_edit_date=current_datetime,
- edits=[{"job_id": "a_job_id"}],
- edit_in_progress=False,
- validity_period=None,
- properties={
- "name": "foo",
- },
- )
- dm = _DataManager()
- dm._repository._save(dn_1)
- dn_2 = dm._get(dn_1)
- # auto set & reload on scope attribute
- assert dn_1.scope == Scope.GLOBAL
- assert dn_2.scope == Scope.GLOBAL
- dn_1.scope = Scope.CYCLE
- assert dn_1.scope == Scope.CYCLE
- assert dn_2.scope == Scope.CYCLE
- dn_2.scope = Scope.SCENARIO
- assert dn_1.scope == Scope.SCENARIO
- assert dn_2.scope == Scope.SCENARIO
- new_datetime = current_datetime + timedelta(1)
- new_datetime_1 = current_datetime + timedelta(3)
- # auto set & reload on last_edit_date attribute
- assert dn_1.last_edit_date == current_datetime
- assert dn_2.last_edit_date == current_datetime
- dn_1.last_edit_date = new_datetime_1
- assert dn_1.last_edit_date == new_datetime_1
- assert dn_2.last_edit_date == new_datetime_1
- dn_2.last_edit_date = new_datetime
- assert dn_1.last_edit_date == new_datetime
- assert dn_2.last_edit_date == new_datetime
- # auto set & reload on name attribute
- assert dn_1.name == "foo"
- assert dn_2.name == "foo"
- dn_1.name = "fed"
- assert dn_1.name == "fed"
- assert dn_2.name == "fed"
- dn_2.name = "def"
- assert dn_1.name == "def"
- assert dn_2.name == "def"
- # auto set & reload on parent_ids attribute (set() object does not have auto set yet)
- assert dn_1.parent_ids == set()
- assert dn_2.parent_ids == set()
- dn_1._parent_ids.update(["sc2"])
- _DataManager._update(dn_1)
- assert dn_1.parent_ids == {"sc2"}
- assert dn_2.parent_ids == {"sc2"}
- dn_2._parent_ids.clear()
- dn_2._parent_ids.update(["sc1"])
- _DataManager._update(dn_2)
- assert dn_1.parent_ids == {"sc1"}
- assert dn_2.parent_ids == {"sc1"}
- dn_2._parent_ids.clear()
- _DataManager._update(dn_2)
- # auto set & reload on edit_in_progress attribute
- assert not dn_2.edit_in_progress
- assert not dn_1.edit_in_progress
- dn_1.edit_in_progress = True
- assert dn_1.edit_in_progress
- assert dn_2.edit_in_progress
- dn_2.unlock_edit()
- assert not dn_1.edit_in_progress
- assert not dn_2.edit_in_progress
- dn_1.lock_edit()
- assert dn_1.edit_in_progress
- assert dn_2.edit_in_progress
- # auto set & reload on validity_period attribute
- time_period_1 = timedelta(1)
- time_period_2 = timedelta(5)
- assert dn_1.validity_period is None
- assert dn_2.validity_period is None
- dn_1.validity_period = time_period_1
- assert dn_1.validity_period == time_period_1
- assert dn_2.validity_period == time_period_1
- dn_2.validity_period = time_period_2
- assert dn_1.validity_period == time_period_2
- assert dn_2.validity_period == time_period_2
- dn_1.last_edit_date = new_datetime
- assert len(dn_1.job_ids) == 1
- assert len(dn_2.job_ids) == 1
- with dn_1 as dn:
- assert dn.config_id == "foo"
- assert dn.owner_id is None
- assert dn.scope == Scope.SCENARIO
- assert dn.last_edit_date == new_datetime
- assert dn.name == "def"
- assert dn.edit_in_progress
- assert dn.validity_period == time_period_2
- assert len(dn.job_ids) == 1
- assert dn._is_in_context
- new_datetime_2 = new_datetime + timedelta(5)
- dn.scope = Scope.CYCLE
- dn.last_edit_date = new_datetime_2
- dn.name = "abc"
- dn.edit_in_progress = False
- dn.validity_period = None
- assert dn.config_id == "foo"
- assert dn.owner_id is None
- assert dn.scope == Scope.SCENARIO
- assert dn.last_edit_date == new_datetime
- assert dn.name == "def"
- assert dn.edit_in_progress
- assert dn.validity_period == time_period_2
- assert len(dn.job_ids) == 1
- assert dn_1.config_id == "foo"
- assert dn_1.owner_id is None
- assert dn_1.scope == Scope.CYCLE
- assert dn_1.last_edit_date == new_datetime_2
- assert dn_1.name == "abc"
- assert not dn_1.edit_in_progress
- assert dn_1.validity_period is None
- assert not dn_1._is_in_context
- assert len(dn_1.job_ids) == 1
- def test_auto_update_and_reload_properties(self):
- dn_1 = InMemoryDataNode("foo", scope=Scope.GLOBAL, properties={"name": "def"})
- dm = _DataManager()
- dm._repository._save(dn_1)
- dn_2 = dm._get(dn_1)
- # auto set & reload on properties attribute
- assert dn_1.properties == {"name": "def"}
- assert dn_2.properties == {"name": "def"}
- dn_1._properties["qux"] = 4
- assert dn_1.properties["qux"] == 4
- assert dn_2.properties["qux"] == 4
- assert dn_1.properties == {"qux": 4, "name": "def"}
- assert dn_2.properties == {"qux": 4, "name": "def"}
- dn_2._properties["qux"] = 5
- assert dn_1.properties["qux"] == 5
- assert dn_2.properties["qux"] == 5
- dn_1.properties["temp_key_1"] = "temp_value_1"
- dn_1.properties["temp_key_2"] = "temp_value_2"
- assert dn_1.properties == {
- "name": "def",
- "qux": 5,
- "temp_key_1": "temp_value_1",
- "temp_key_2": "temp_value_2",
- }
- assert dn_2.properties == {"name": "def", "qux": 5, "temp_key_1": "temp_value_1", "temp_key_2": "temp_value_2"}
- dn_1.properties.pop("temp_key_1")
- assert "temp_key_1" not in dn_1.properties.keys()
- assert "temp_key_1" not in dn_1.properties.keys()
- assert dn_1.properties == {"name": "def", "qux": 5, "temp_key_2": "temp_value_2"}
- assert dn_2.properties == {"name": "def", "qux": 5, "temp_key_2": "temp_value_2"}
- dn_2.properties.pop("temp_key_2")
- assert dn_1.properties == {"qux": 5, "name": "def"}
- assert dn_2.properties == {"qux": 5, "name": "def"}
- assert "temp_key_2" not in dn_1.properties.keys()
- assert "temp_key_2" not in dn_2.properties.keys()
- dn_1.properties["temp_key_3"] = 0
- assert dn_1.properties == {"qux": 5, "temp_key_3": 0, "name": "def"}
- assert dn_2.properties == {"qux": 5, "temp_key_3": 0, "name": "def"}
- dn_1.properties.update({"temp_key_3": 1})
- assert dn_1.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
- assert dn_2.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
- dn_1.properties.update({})
- assert dn_1.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
- assert dn_2.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
- dn_1.properties["temp_key_4"] = 0
- dn_1.properties["temp_key_5"] = 0
- with dn_1 as dn:
- assert dn._is_in_context
- assert dn.properties["qux"] == 5
- assert dn.properties["temp_key_3"] == 1
- assert dn.properties["temp_key_4"] == 0
- assert dn.properties["temp_key_5"] == 0
- dn.properties["qux"] = 9
- dn.properties.pop("temp_key_3")
- dn.properties.pop("temp_key_4")
- dn.properties.update({"temp_key_4": 1})
- dn.properties.update({"temp_key_5": 2})
- dn.properties.pop("temp_key_5")
- dn.properties.update({})
- assert dn.properties["qux"] == 5
- assert dn.properties["temp_key_3"] == 1
- assert dn.properties["temp_key_4"] == 0
- assert dn.properties["temp_key_5"] == 0
- assert not dn_1._is_in_context
- assert dn_1.properties["qux"] == 9
- assert "temp_key_3" not in dn_1.properties.keys()
- assert dn_1.properties["temp_key_4"] == 1
- assert "temp_key_5" not in dn_1.properties.keys()
- def test_get_parents(self, data_node):
- with mock.patch("taipy.core.get_parents") as mck:
- data_node.get_parents()
- mck.assert_called_once_with(data_node)
- def test_data_node_with_env_variable_value_not_stored(self):
- dn_config = Config.configure_data_node("A", prop="ENV[FOO]")
- with mock.patch.dict(os.environ, {"FOO": "bar"}):
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- assert dn._properties.data["prop"] == "ENV[FOO]"
- assert dn.properties["prop"] == "bar"
- def test_path_populated_with_config_default_path(self):
- dn_config = Config.configure_data_node("data_node", "pickle", default_path="foo.p")
- assert dn_config.default_path == "foo.p"
- data_node = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- assert data_node.path == "foo.p"
- data_node.path = "baz.p"
- assert data_node.path == "baz.p"
- def test_edit_edit_tracking(self):
- dn_config = Config.configure_data_node("A")
- data_node = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- data_node.write(data="1", job_id="job_1")
- data_node.write(data="2", job_id="job_1")
- data_node.write(data="3", job_id="job_1")
- assert len(data_node.edits) == 3
- assert len(data_node.job_ids) == 3
- assert data_node.edits[-1] == data_node.get_last_edit()
- assert data_node.last_edit_date == data_node.get_last_edit().get("timestamp")
- date = datetime(2050, 1, 1, 12, 12)
- data_node.write(data="4", timestamp=date, message="This is a comment on this edit", env="staging")
- assert len(data_node.edits) == 4
- assert len(data_node.job_ids) == 3
- assert data_node.edits[-1] == data_node.get_last_edit()
- last_edit = data_node.get_last_edit()
- assert last_edit["message"] == "This is a comment on this edit"
- assert last_edit["env"] == "staging"
- assert last_edit["timestamp"] == date
- def test_label(self):
- a_date = datetime.now()
- dn = DataNode(
- "foo_bar",
- Scope.SCENARIO,
- DataNodeId("an_id"),
- "a_scenario_id",
- {"a_parent_id"},
- a_date,
- [{"job_id": "a_job_id"}],
- edit_in_progress=False,
- prop="erty",
- name="a name",
- )
- with mock.patch("taipy.core.get") as get_mck:
- class MockOwner:
- label = "owner_label"
- def get_label(self):
- return self.label
- get_mck.return_value = MockOwner()
- assert dn.get_label() == "owner_label > " + dn.name
- assert dn.get_simple_label() == dn.name
- def test_explicit_label(self):
- a_date = datetime.now()
- dn = DataNode(
- "foo_bar",
- Scope.SCENARIO,
- DataNodeId("an_id"),
- "a_scenario_id",
- {"a_parent_id"},
- a_date,
- [{"job_id": "a_job_id"}],
- edit_in_progress=False,
- label="a label",
- name="a name",
- )
- assert dn.get_label() == "a label"
- assert dn.get_simple_label() == "a label"
- def test_change_data_node_name(self):
- cgf = Config.configure_data_node("foo", scope=Scope.GLOBAL)
- dn = tp.create_global_data_node(cgf)
- dn.name = "bar"
- assert dn.name == "bar"
- # This new syntax will be the only one allowed: https://github.com/Avaiga/taipy-core/issues/806
- dn.properties["name"] = "baz"
- assert dn.name == "baz"
- def test_locked_data_node_write_should_fail_with_wrong_editor(self):
- dn_config = Config.configure_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- dn.lock_edit("editor_1")
- # Should raise exception for wrong editor
- with pytest.raises(DataNodeIsBeingEdited):
- dn.write("data", editor_id="editor_2")
- # Should succeed with correct editor
- dn.write("data", editor_id="editor_1")
- assert dn.read() == "data"
- def test_locked_data_node_write_should_fail_before_expiration_date_and_succeed_after(self):
- dn_config = Config.configure_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- lock_time = datetime.now()
- with freezegun.freeze_time(lock_time):
- dn.lock_edit("editor_1")
- with freezegun.freeze_time(lock_time + timedelta(minutes=29)):
- # Should raise exception for wrong editor and expiration date NOT passed
- with pytest.raises(DataNodeIsBeingEdited):
- dn.write("data", editor_id="editor_2")
- with freezegun.freeze_time(lock_time + timedelta(minutes=31)):
- # Should succeed with wrong editor but expiration date passed
- dn.write("data", editor_id="editor_2")
- assert dn.read() == "data"
- def test_locked_data_node_append_should_fail_with_wrong_editor(self):
- dn_config = Config.configure_csv_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- first_line = pd.DataFrame(data={"col1": [1], "col2": [3]})
- second_line = pd.DataFrame(data={"col1": [2], "col2": [4]})
- data = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
- dn.write(first_line)
- assert first_line.equals(dn.read())
- dn.lock_edit("editor_1")
- with pytest.raises(DataNodeIsBeingEdited):
- dn.append(second_line, editor_id="editor_2")
- dn.append(second_line, editor_id="editor_1")
- assert dn.read().equals(data)
- def test_locked_data_node_append_should_fail_before_expiration_date_and_succeed_after(self):
- dn_config = Config.configure_csv_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- first_line = pd.DataFrame(data={"col1": [1], "col2": [3]})
- second_line = pd.DataFrame(data={"col1": [2], "col2": [4]})
- data = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
- dn.write(first_line)
- assert first_line.equals(dn.read())
- lock_time = datetime.now()
- with freezegun.freeze_time(lock_time):
- dn.lock_edit("editor_1")
- with freezegun.freeze_time(lock_time + timedelta(minutes=29)):
- # Should raise exception for wrong editor and expiration date NOT passed
- with pytest.raises(DataNodeIsBeingEdited):
- dn.append(second_line, editor_id="editor_2")
- with freezegun.freeze_time(lock_time + timedelta(minutes=31)):
- # Should succeed with wrong editor but expiration date passed
- dn.append(second_line, editor_id="editor_2")
- assert dn.read().equals(data)
- def test_orchestrator_write_without_editor_id(self):
- dn_config = Config.configure_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- dn.lock_edit("editor_1")
- # Orchestrator write without editor_id should succeed
- dn.write("orchestrator_data")
- assert dn.read() == "orchestrator_data"
- def test_editor_fails_writing_a_data_node_locked_by_orchestrator(self):
- dn_config = Config.configure_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- dn.lock_edit() # Locked by orchestrator
- with pytest.raises(DataNodeIsBeingEdited):
- dn.write("data", editor_id="editor_1")
- # Orchestrator write without editor_id should succeed
- dn.write("orchestrator_data", job_id=JobId("job_1"))
- assert dn.read() == "orchestrator_data"
- def test_editor_fails_appending_a_data_node_locked_by_orchestrator(self):
- dn_config = Config.configure_csv_data_node("A")
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- first_line = pd.DataFrame(data={"col1": [1], "col2": [3]})
- second_line = pd.DataFrame(data={"col1": [2], "col2": [4]})
- data = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
- dn.write(first_line)
- assert first_line.equals(dn.read())
- dn = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- dn.lock_edit() # Locked by orchestrator
- with pytest.raises(DataNodeIsBeingEdited):
- dn.append(second_line, editor_id="editor_1")
- assert dn.read().equals(first_line)
- dn.append(second_line, job_id=JobId("job_1"))
- assert dn.read().equals(data)
- def test_track_edit(self):
- dn_config = Config.configure_data_node("A")
- data_node = _DataManager._bulk_get_or_create([dn_config])[dn_config]
- before = datetime.now()
- data_node.track_edit(job_id="job_1")
- data_node.track_edit(editor_id="editor_1")
- data_node.track_edit(comment="This is a comment on this edit")
- data_node.track_edit(editor_id="editor_2", comment="This is another comment on this edit")
- data_node.track_edit(editor_id="editor_3", foo="bar")
- after = datetime.now()
- timestamp = datetime.now()
- data_node.track_edit(timestamp=timestamp)
- _DataManagerFactory._build_manager()._update(data_node)
- # To save the edits because track edit does not save the data node
- assert len(data_node.edits) == 6
- assert data_node.edits[-1] == data_node.get_last_edit()
- assert data_node.last_edit_date == data_node.get_last_edit().get(EDIT_TIMESTAMP_KEY)
- edit_0 = data_node.edits[0]
- assert len(edit_0) == 2
- assert edit_0[EDIT_JOB_ID_KEY] == "job_1"
- assert edit_0[EDIT_TIMESTAMP_KEY] >= before
- assert edit_0[EDIT_TIMESTAMP_KEY] <= after
- edit_1 = data_node.edits[1]
- assert len(edit_1) == 2
- assert edit_1[EDIT_EDITOR_ID_KEY] == "editor_1"
- assert edit_1[EDIT_TIMESTAMP_KEY] >= before
- assert edit_1[EDIT_TIMESTAMP_KEY] <= after
- edit_2 = data_node.edits[2]
- assert len(edit_2) == 2
- assert edit_2[EDIT_COMMENT_KEY] == "This is a comment on this edit"
- assert edit_2[EDIT_TIMESTAMP_KEY] >= before
- assert edit_2[EDIT_TIMESTAMP_KEY] <= after
- edit_3 = data_node.edits[3]
- assert len(edit_3) == 3
- assert edit_3[EDIT_EDITOR_ID_KEY] == "editor_2"
- assert edit_3[EDIT_COMMENT_KEY] == "This is another comment on this edit"
- assert edit_3[EDIT_TIMESTAMP_KEY] >= before
- assert edit_3[EDIT_TIMESTAMP_KEY] <= after
- edit_4 = data_node.edits[4]
- assert len(edit_4) == 3
- assert edit_4[EDIT_EDITOR_ID_KEY] == "editor_3"
- assert edit_4["foo"] == "bar"
- assert edit_4[EDIT_TIMESTAMP_KEY] >= before
- assert edit_4[EDIT_TIMESTAMP_KEY] <= after
- edit_5 = data_node.edits[5]
- assert len(edit_5) == 1
- assert edit_5[EDIT_TIMESTAMP_KEY] == timestamp
- def test_normalize_path(self):
- dn = DataNode(
- config_id="foo_bar",
- scope=Scope.SCENARIO,
- id=DataNodeId("an_id"),
- path=r"data\foo\bar.csv",
- )
- assert dn.config_id == "foo_bar"
- assert dn.scope == Scope.SCENARIO
- assert dn.id == "an_id"
- assert dn.properties["path"] == "data/foo/bar.csv"
|