123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- import pathlib
- import pickle
- import re
- from datetime import datetime, timedelta
- from time import sleep
- import freezegun
- import pandas as pd
- import pytest
- from pandas.testing import assert_frame_equal
- from taipy import Scope
- from taipy.common.config import Config
- from taipy.common.config.exceptions.exceptions import InvalidConfigurationId
- from taipy.core.common._utils import _normalize_path
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.data._data_manager_factory import _DataManagerFactory
- from taipy.core.data.pickle import PickleDataNode
- from taipy.core.exceptions.exceptions import NoData
- from taipy.core.reason import NoFileToDownload, NotAFile
- @pytest.fixture(scope="function", autouse=True)
- def cleanup():
- yield
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.p")
- if os.path.isfile(path):
- os.remove(path)
- class TestPickleDataNodeEntity:
- @pytest.fixture(scope="function", autouse=True)
- def remove_pickle_files(self):
- yield
- import glob
- for f in glob.glob("*.p"):
- os.remove(f)
- def test_create_with_manager(self, pickle_file_path):
- parquet_dn_config = Config.configure_pickle_data_node(id="baz", default_path=pickle_file_path)
- parquet_dn = _DataManagerFactory._build_manager()._create_and_set(parquet_dn_config, None, None)
- assert isinstance(parquet_dn, PickleDataNode)
- def test_create(self):
- pickle_dn_config = Config.configure_pickle_data_node(
- id="foobar_bazxyxea", default_path="Data", default_data="Data"
- )
- dn = _DataManagerFactory._build_manager()._create_and_set(pickle_dn_config, None, None)
- assert isinstance(dn, PickleDataNode)
- assert dn.storage_type() == "pickle"
- assert dn.config_id == "foobar_bazxyxea"
- assert dn.scope == Scope.SCENARIO
- assert dn.id is not None
- assert dn.name is None
- assert dn.owner_id is None
- assert dn.last_edit_date is not None
- assert dn.job_ids == []
- assert dn.is_ready_for_reading
- assert dn.read() == "Data"
- assert dn.last_edit_date is not None
- assert dn.job_ids == []
- with pytest.raises(InvalidConfigurationId):
- PickleDataNode("foobar bazxyxea", Scope.SCENARIO, properties={"default_data": "Data"})
- def test_get_user_properties(self, pickle_file_path):
- dn_1 = PickleDataNode("dn_1", Scope.SCENARIO, properties={"path": pickle_file_path})
- assert dn_1._get_user_properties() == {}
- dn_2 = PickleDataNode(
- "dn_2",
- Scope.SCENARIO,
- properties={
- "default_data": "foo",
- "default_path": pickle_file_path,
- "foo": "bar",
- },
- )
- # default_data, default_path, path, is_generated are filtered out
- assert dn_2._get_user_properties() == {"foo": "bar"}
- def test_new_pickle_data_node_with_existing_file_is_ready_for_reading(self):
- not_ready_dn_cfg = Config.configure_data_node("not_ready_data_node_config_id", "pickle", path="NOT_EXISTING.p")
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.p")
- ready_dn_cfg = Config.configure_data_node("ready_data_node_config_id", "pickle", path=path)
- dns = _DataManager._bulk_get_or_create([not_ready_dn_cfg, ready_dn_cfg])
- assert not dns[not_ready_dn_cfg].is_ready_for_reading
- assert dns[ready_dn_cfg].is_ready_for_reading
- def test_create_with_file_name(self):
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"default_data": "bar", "path": "foo.FILE.p"})
- assert os.path.isfile("foo.FILE.p")
- assert dn.read() == "bar"
- dn.write("qux")
- assert dn.read() == "qux"
- dn.write(1998)
- assert dn.read() == 1998
- def test_read_and_write(self):
- no_data_dn = PickleDataNode("foo", Scope.SCENARIO)
- with pytest.raises(NoData):
- assert no_data_dn.read() is None
- no_data_dn.read_or_raise()
- pickle_str = PickleDataNode("foo", Scope.SCENARIO, properties={"default_data": "bar"})
- assert isinstance(pickle_str.read(), str)
- assert pickle_str.read() == "bar"
- pickle_str.properties["default_data"] = "baz" # this modifies the default data value but not the data itself
- assert pickle_str.read() == "bar"
- pickle_str.write("qux")
- assert pickle_str.read() == "qux"
- pickle_str.write(1998)
- assert pickle_str.read() == 1998
- assert isinstance(pickle_str.read(), int)
- pickle_int = PickleDataNode("foo", Scope.SCENARIO, properties={"default_data": 197})
- assert isinstance(pickle_int.read(), int)
- assert pickle_int.read() == 197
- pickle_dict = PickleDataNode(
- "foo", Scope.SCENARIO, properties={"default_data": {"bar": 12, "baz": "qux", "quux": [13]}}
- )
- assert isinstance(pickle_dict.read(), dict)
- assert pickle_dict.read() == {"bar": 12, "baz": "qux", "quux": [13]}
- def test_path_overrides_default_path(self):
- dn = PickleDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_data": "bar",
- "default_path": "foo.FILE.p",
- "path": "bar.FILE.p",
- },
- )
- assert dn.path == "bar.FILE.p"
- def test_set_path(self):
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"default_path": "foo.p"})
- assert dn.path == "foo.p"
- dn.path = "bar.p"
- assert dn.path == "bar.p"
- def test_is_generated(self):
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={})
- assert dn.is_generated
- dn.path = "bar.p"
- assert not dn.is_generated
- def test_read_write_after_modify_path(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.p")
- new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.p")
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"default_path": path})
- read_data = dn.read()
- assert read_data is not None
- dn.path = new_path
- with pytest.raises(FileNotFoundError):
- dn.read()
- dn.write({"other": "stuff"})
- assert dn.read() == {"other": "stuff"}
- def test_get_system_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
- temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.pickle"))
- pd.DataFrame([]).to_pickle(temp_file_path)
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
- dn.write(pd.DataFrame([1, 2, 3]))
- previous_edit_date = dn.last_edit_date
- sleep(0.1)
- pd.DataFrame([4, 5, 6]).to_pickle(temp_file_path)
- new_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
- assert previous_edit_date < dn.last_edit_date
- assert new_edit_date == dn.last_edit_date
- sleep(0.1)
- dn.write(pd.DataFrame([7, 8, 9]))
- assert new_edit_date < dn.last_edit_date
- os.unlink(temp_file_path)
- def test_migrate_to_new_path(self, tmp_path):
- _base_path = os.path.join(tmp_path, ".data")
- path = os.path.join(_base_path, "test.p")
- # create a file on old path
- os.mkdir(_base_path)
- with open(path, "w"):
- pass
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"default_data": "bar", "path": path})
- assert ".data" not in dn.path
- assert os.path.exists(dn.path)
- def test_is_downloadable(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.p")
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": path})
- reasons = dn.is_downloadable()
- assert reasons
- assert reasons.reasons == ""
- def test_is_not_downloadable_no_file(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/wrong_path.p")
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": path})
- reasons = dn.is_downloadable()
- assert not reasons
- assert not reasons
- assert len(reasons._reasons) == 1
- assert str(NoFileToDownload(_normalize_path(path), dn.id)) in reasons.reasons
- def test_is_not_downloadable_not_a_file(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample")
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": path})
- reasons = dn.is_downloadable()
- assert not reasons
- assert len(reasons._reasons) == 1
- assert str(NotAFile(_normalize_path(path), dn.id)) in reasons.reasons
- def test_get_download_path(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.p")
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": path})
- assert re.split(r"[\\/]", dn._get_downloadable_path()) == re.split(r"[\\/]", path)
- def test_get_download_path_with_not_existed_file(self):
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": "NOT_EXISTED.p"})
- assert dn._get_downloadable_path() == ""
- def test_upload(self, pickle_file_path, tmpdir_factory):
- old_pickle_path = tmpdir_factory.mktemp("data").join("df.p").strpath
- old_data = pd.DataFrame([{"a": 0, "b": 1, "c": 2}, {"a": 3, "b": 4, "c": 5}])
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": old_pickle_path})
- dn.write(old_data)
- old_last_edit_date = dn.last_edit_date
- upload_content = pd.read_pickle(pickle_file_path)
- with freezegun.freeze_time(old_last_edit_date + timedelta(seconds=1)):
- dn._upload(pickle_file_path)
- assert_frame_equal(dn.read(), upload_content) # The content of the dn should change to the uploaded content
- assert dn.last_edit_date > old_last_edit_date
- assert dn.path == _normalize_path(old_pickle_path) # The path of the dn should not change
- def test_upload_with_upload_check(self, pickle_file_path, tmpdir_factory):
- old_pickle_path = tmpdir_factory.mktemp("data").join("df.p").strpath
- old_data = pd.DataFrame([{"a": 0, "b": 1, "c": 2}, {"a": 3, "b": 4, "c": 5}])
- dn = PickleDataNode("foo", Scope.SCENARIO, properties={"path": old_pickle_path})
- dn.write(old_data)
- old_last_edit_date = dn.last_edit_date
- def check_data_column(upload_path, upload_data):
- return upload_path.endswith(".p") and upload_data.columns.tolist() == ["a", "b", "c"]
- not_exists_json_path = tmpdir_factory.mktemp("data").join("not_exists.json").strpath
- reasons = dn._upload(not_exists_json_path, upload_checker=check_data_column)
- assert bool(reasons) is False
- assert (
- str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.json can not be read,"
- f' therefore is not a valid data file for data node "{dn.id}"'
- )
- not_pickle_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_pickle").strpath
- with open(str(not_pickle_path), "wb") as f:
- pickle.dump(pd.DataFrame([{"a": 1, "b": 2, "d": 3}, {"a": 4, "b": 5, "d": 6}]), f)
- # The upload should fail when the file is not a pickle
- reasons = dn._upload(not_pickle_path, upload_checker=check_data_column)
- assert bool(reasons) is False
- assert (
- str(list(reasons._reasons[dn.id])[0])
- == f'The uploaded file wrong_format_df.not_pickle has invalid data for data node "{dn.id}"'
- )
- wrong_format_pickle_path = tmpdir_factory.mktemp("data").join("wrong_format_df.p").strpath
- with open(str(wrong_format_pickle_path), "wb") as f:
- pickle.dump(pd.DataFrame([{"a": 1, "b": 2, "d": 3}, {"a": 4, "b": 5, "d": 6}]), f)
- # The upload should fail when check_data_column() return False
- reasons = dn._upload(wrong_format_pickle_path, upload_checker=check_data_column)
- assert bool(reasons) is False
- assert (
- str(list(reasons._reasons[dn.id])[0])
- == f'The uploaded file wrong_format_df.p has invalid data for data node "{dn.id}"'
- )
- assert_frame_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
- assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
- assert dn.path == _normalize_path(old_pickle_path) # The path of the dn should not change
- # The upload should succeed when check_data_column() return True
- assert dn._upload(pickle_file_path, upload_checker=check_data_column)
|