123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- import pathlib
- from datetime import datetime
- from time import sleep
- from typing import Dict
- import numpy as np
- import pandas as pd
- import pytest
- from pandas.testing import assert_frame_equal
- from taipy.config.common.scope import Scope
- from taipy.config.config import Config
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.data.data_node_id import DataNodeId
- from taipy.core.data.excel import ExcelDataNode
- from taipy.core.data.operator import JoinOperator, Operator
- from taipy.core.exceptions.exceptions import (
- ExposedTypeLengthMismatch,
- InvalidExposedType,
- NoData,
- NonExistingExcelSheet,
- SheetNameLengthMismatch,
- )
- @pytest.fixture(scope="function", autouse=True)
- def cleanup():
- yield
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.xlsx")
- if os.path.exists(path):
- os.remove(path)
- class MyCustomObject:
- def __init__(self, id, integer, text):
- self.id = id
- self.integer = integer
- self.text = text
- class MyCustomObject1:
- def __init__(self, id, integer, text):
- self.id = id
- self.integer = integer
- self.text = text
- class MyCustomObject2:
- def __init__(self, id, integer, text):
- self.id = id
- self.integer = integer
- self.text = text
- class TestExcelDataNode:
- def test_new_excel_data_node_with_existing_file_is_ready_for_reading(self):
- not_ready_dn_cfg = Config.configure_data_node("not_ready_data_node_config_id", "excel", path="NOT_EXISTING.csv")
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- ready_dn_cfg = Config.configure_data_node("ready_data_node_config_id", "excel", path=path)
- dns = _DataManager._bulk_get_or_create([not_ready_dn_cfg, ready_dn_cfg])
- assert not dns[not_ready_dn_cfg].is_ready_for_reading
- assert dns[ready_dn_cfg].is_ready_for_reading
- def test_create(self):
- path = "data/node/path"
- sheet_names = ["sheet_name_1", "sheet_name_2"]
- dn = ExcelDataNode(
- "foo_bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "sheet_name": sheet_names, "name": "super name"},
- )
- assert isinstance(dn, ExcelDataNode)
- assert dn.storage_type() == "excel"
- assert dn.config_id == "foo_bar"
- assert dn.name == "super name"
- assert dn.scope == Scope.SCENARIO
- assert dn.id is not None
- assert dn.owner_id is None
- assert dn.parent_ids == set()
- assert dn.last_edit_date is None
- assert dn.job_ids == []
- assert not dn.is_ready_for_reading
- assert dn.path == path
- assert dn.has_header is False
- assert dn.sheet_name == sheet_names
- def test_get_user_properties(self, excel_file):
- dn_1 = ExcelDataNode("dn_1", Scope.SCENARIO, properties={"path": "data/node/path"})
- assert dn_1._get_user_properties() == {}
- dn_2 = ExcelDataNode(
- "dn_2",
- Scope.SCENARIO,
- properties={
- "exposed_type": "numpy",
- "default_data": "foo",
- "default_path": excel_file,
- "has_header": False,
- "sheet_name": ["sheet_name_1", "sheet_name_2"],
- "foo": "bar",
- },
- )
- # exposed_type, default_data, default_path, path, has_header are filtered out
- assert dn_2._get_user_properties() == {"foo": "bar"}
- def test_modin_deprecated_in_favor_of_pandas(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- # Create ExcelDataNode with modin exposed_type
- props = {"path": path, "sheet_name": "Sheet1", "exposed_type": "modin"}
- modin_dn = ExcelDataNode("bar", Scope.SCENARIO, properties=props)
- assert modin_dn.properties["exposed_type"] == "pandas"
- data_modin = modin_dn.read()
- assert isinstance(data_modin, pd.DataFrame)
- def test_read_with_header(self):
- with pytest.raises(NoData):
- not_existing_excel = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": "WRONG.xlsx"})
- assert not_existing_excel.read() is None
- not_existing_excel.read_or_raise()
- empty_excel_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/empty.xlsx")
- empty_excel = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": empty_excel_path, "exposed_type": MyCustomObject, "has_header": True},
- )
- assert len(empty_excel.read()) == 0
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- # Create ExcelDataNode without exposed_type (Default is pandas.DataFrame)
- excel_data_node_as_pandas = ExcelDataNode(
- "bar", Scope.SCENARIO, properties={"path": path, "sheet_name": "Sheet1"}
- )
- data_pandas = excel_data_node_as_pandas.read()
- assert isinstance(data_pandas, pd.DataFrame)
- assert len(data_pandas) == 5
- assert np.array_equal(data_pandas.to_numpy(), pd.read_excel(path).to_numpy())
- # Create ExcelDataNode with numpy exposed_type
- excel_data_node_as_numpy = ExcelDataNode(
- "bar", Scope.SCENARIO, properties={"path": path, "exposed_type": "numpy", "sheet_name": "Sheet1"}
- )
- data_numpy = excel_data_node_as_numpy.read()
- assert isinstance(data_numpy, np.ndarray)
- assert len(data_numpy) == 5
- assert np.array_equal(data_numpy, pd.read_excel(path).to_numpy())
- # Create the same ExcelDataNode but with custom exposed_type
- non_existing_sheet_name_custom = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "sheet_name": "abc", "exposed_type": MyCustomObject},
- )
- with pytest.raises(NonExistingExcelSheet):
- non_existing_sheet_name_custom.read()
- excel_data_node_as_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "exposed_type": MyCustomObject, "sheet_name": "Sheet1"},
- )
- data_custom = excel_data_node_as_custom_object.read()
- assert isinstance(data_custom, list)
- assert len(data_custom) == 5
- for (_, row_pandas), row_custom in zip(data_pandas.iterrows(), data_custom):
- assert isinstance(row_custom, MyCustomObject)
- assert row_pandas["id"] == row_custom.id
- assert row_pandas["integer"] == row_custom.integer
- assert row_pandas["text"] == row_custom.text
- def test_read_without_header(self):
- not_existing_excel = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": "WRONG.xlsx", "has_header": False}
- )
- with pytest.raises(NoData):
- assert not_existing_excel.read() is None
- not_existing_excel.read_or_raise()
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- # Create ExcelDataNode without exposed_type (Default is pandas.DataFrame)
- excel_data_node_as_pandas = ExcelDataNode(
- "bar", Scope.SCENARIO, properties={"path": path, "has_header": False, "sheet_name": "Sheet1"}
- )
- data_pandas = excel_data_node_as_pandas.read()
- assert isinstance(data_pandas, pd.DataFrame)
- assert len(data_pandas) == 6
- assert np.array_equal(data_pandas.to_numpy(), pd.read_excel(path, header=None).to_numpy())
- # Create ExcelDataNode with numpy exposed_type
- excel_data_node_as_numpy = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "exposed_type": "numpy", "sheet_name": "Sheet1"},
- )
- data_numpy = excel_data_node_as_numpy.read()
- assert isinstance(data_numpy, np.ndarray)
- assert len(data_numpy) == 6
- assert np.array_equal(data_numpy, pd.read_excel(path, header=None).to_numpy())
- # Create the same ExcelDataNode but with custom exposed_type
- non_existing_sheet_name_custom = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "sheet_name": "abc", "exposed_type": MyCustomObject},
- )
- with pytest.raises(NonExistingExcelSheet):
- non_existing_sheet_name_custom.read()
- excel_data_node_as_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "has_header": False,
- "exposed_type": MyCustomObject,
- "sheet_name": "Sheet1",
- },
- )
- data_custom = excel_data_node_as_custom_object.read()
- assert isinstance(data_custom, list)
- assert len(data_custom) == 6
- for (_, row_pandas), row_custom in zip(data_pandas.iterrows(), data_custom):
- assert isinstance(row_custom, MyCustomObject)
- assert row_pandas[0] == row_custom.id
- assert row_pandas[1] == row_custom.integer
- assert row_pandas[2] == row_custom.text
- @pytest.mark.parametrize(
- "content,columns",
- [
- ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}], None),
- ([[11, 22, 33], [44, 55, 66]], None),
- ([[11, 22, 33], [44, 55, 66]], ["e", "f", "g"]),
- ],
- )
- def test_write(self, excel_file, default_data_frame, content, columns):
- excel_dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": excel_file, "sheet_name": "Sheet1"})
- assert np.array_equal(excel_dn.read().values, default_data_frame.values)
- if not columns:
- excel_dn.write(content)
- df = pd.DataFrame(content)
- else:
- excel_dn.write_with_column_names(content, columns)
- df = pd.DataFrame(content, columns=columns)
- assert np.array_equal(excel_dn.read().values, df.values)
- excel_dn.write(None)
- assert len(excel_dn.read()) == 0
- @pytest.mark.parametrize(
- "content,sheet_name",
- [
- ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}], "sheet_name"),
- ([[11, 22, 33], [44, 55, 66]], ["sheet_name"]),
- ],
- )
- def test_write_with_sheet_name(self, excel_file_with_sheet_name, default_data_frame, content, sheet_name):
- excel_dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file_with_sheet_name, "sheet_name": sheet_name}
- )
- df = pd.DataFrame(content)
- if isinstance(sheet_name, str):
- assert np.array_equal(excel_dn.read().values, default_data_frame.values)
- else:
- assert np.array_equal(excel_dn.read()["sheet_name"].values, default_data_frame.values)
- excel_dn.write(content)
- if isinstance(sheet_name, str):
- assert np.array_equal(excel_dn.read().values, df.values)
- else:
- assert np.array_equal(excel_dn.read()["sheet_name"].values, df.values)
- sheet_names = pd.ExcelFile(excel_file_with_sheet_name).sheet_names
- expected_sheet_name = sheet_name[0] if isinstance(sheet_name, list) else sheet_name
- assert sheet_names[0] == expected_sheet_name
- excel_dn.write(None)
- if isinstance(sheet_name, str):
- assert len(excel_dn.read()) == 0
- else:
- assert len(excel_dn.read()) == 1
- @pytest.mark.parametrize(
- "content,sheet_name",
- [
- ([[11, 22, 33], [44, 55, 66]], ["sheet_name_1", "sheet_name_2"]),
- ],
- )
- def test_raise_write_with_sheet_name_length_mismatch(
- self, excel_file_with_sheet_name, default_data_frame, content, sheet_name
- ):
- excel_dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file_with_sheet_name, "sheet_name": sheet_name}
- )
- with pytest.raises(SheetNameLengthMismatch):
- excel_dn.write(content)
- @pytest.mark.parametrize(
- "content",
- [
- ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
- ],
- )
- def test_write_without_sheet_name(self, excel_file_with_sheet_name, default_data_frame, content):
- excel_dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": excel_file_with_sheet_name})
- default_data_frame = {"sheet_name": default_data_frame}
- df = {"Sheet1": pd.DataFrame(content)}
- assert np.array_equal(excel_dn.read()["sheet_name"].values, default_data_frame["sheet_name"].values)
- excel_dn.write(content)
- assert np.array_equal(excel_dn.read()["Sheet1"].values, df["Sheet1"].values)
- sheet_names = pd.ExcelFile(excel_file_with_sheet_name).sheet_names
- expected_sheet_name = "Sheet1"
- assert sheet_names[0] == expected_sheet_name
- excel_dn.write(None)
- assert len(excel_dn.read()) == 1
- @pytest.mark.parametrize(
- "content,columns,sheet_name",
- [
- ([[11, 22, 33], [44, 55, 66]], ["e", "f", "g"], "sheet_name"),
- ([[11, 22, 33], [44, 55, 66]], ["e", "f", "g"], ["sheet_name"]),
- ],
- )
- def test_write_with_column_and_sheet_name(
- self, excel_file_with_sheet_name, default_data_frame, content, columns, sheet_name
- ):
- excel_dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file_with_sheet_name, "sheet_name": sheet_name}
- )
- df = pd.DataFrame(content)
- if isinstance(sheet_name, str):
- assert np.array_equal(excel_dn.read().values, default_data_frame.values)
- else:
- assert np.array_equal(excel_dn.read()["sheet_name"].values, default_data_frame.values)
- excel_dn.write_with_column_names(content, columns)
- if isinstance(sheet_name, str):
- assert np.array_equal(excel_dn.read().values, df.values)
- else:
- assert np.array_equal(excel_dn.read()["sheet_name"].values, df.values)
- sheet_names = pd.ExcelFile(excel_file_with_sheet_name).sheet_names
- expected_sheet_name = sheet_name[0] if isinstance(sheet_name, list) else sheet_name
- assert sheet_names[0] == expected_sheet_name
- excel_dn.write(None)
- if isinstance(sheet_name, str):
- assert len(excel_dn.read()) == 0
- else:
- assert len(excel_dn.read()) == 1
- def test_read_multi_sheet_with_header(self):
- not_existing_excel = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": "WRONG.xlsx", "sheet_name": ["sheet_name_1", "sheet_name_2"]},
- )
- with pytest.raises(NoData):
- assert not_existing_excel.read() is None
- not_existing_excel.read_or_raise()
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- sheet_names = ["Sheet1", "Sheet2"]
- # Create ExcelDataNode without exposed_type (Default is pandas.DataFrame)
- excel_data_node_as_pandas = ExcelDataNode(
- "bar", Scope.SCENARIO, properties={"path": path, "sheet_name": sheet_names}
- )
- data_pandas = excel_data_node_as_pandas.read()
- assert isinstance(data_pandas, Dict)
- assert len(data_pandas) == 2
- assert all(
- len(data_pandas[sheet_name] == 5) and isinstance(data_pandas[sheet_name], pd.DataFrame)
- for sheet_name in sheet_names
- )
- assert list(data_pandas.keys()) == sheet_names
- for sheet_name in sheet_names:
- assert data_pandas[sheet_name].equals(pd.read_excel(path, sheet_name=sheet_name))
- excel_data_node_as_pandas_no_sheet_name = ExcelDataNode("bar", Scope.SCENARIO, properties={"path": path})
- data_pandas_no_sheet_name = excel_data_node_as_pandas_no_sheet_name.read()
- assert isinstance(data_pandas_no_sheet_name, Dict)
- for key in data_pandas_no_sheet_name.keys():
- assert isinstance(data_pandas_no_sheet_name[key], pd.DataFrame)
- assert data_pandas[key].equals(data_pandas_no_sheet_name[key])
- # Create ExcelDataNode with numpy exposed_type
- excel_data_node_as_numpy = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "sheet_name": sheet_names, "exposed_type": "numpy"},
- )
- data_numpy = excel_data_node_as_numpy.read()
- assert isinstance(data_numpy, Dict)
- assert len(data_numpy) == 2
- assert all(
- len(data_numpy[sheet_name] == 5) and isinstance(data_numpy[sheet_name], np.ndarray)
- for sheet_name in sheet_names
- )
- assert list(data_numpy.keys()) == sheet_names
- for sheet_name in sheet_names:
- assert np.array_equal(data_pandas[sheet_name], pd.read_excel(path, sheet_name=sheet_name).to_numpy())
- excel_data_node_as_numpy_no_sheet_name = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "exposed_type": "numpy"},
- )
- data_numpy_no_sheet_name = excel_data_node_as_numpy_no_sheet_name.read()
- assert isinstance(data_numpy_no_sheet_name, Dict)
- for key in data_numpy_no_sheet_name.keys():
- assert isinstance(data_numpy_no_sheet_name[key], np.ndarray)
- assert np.array_equal(data_numpy[key], data_numpy_no_sheet_name[key])
- # Create the same ExcelDataNode but with custom exposed_type
- non_existing_sheet_name_custom = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "sheet_name": ["Sheet1", "xyz"],
- "exposed_type": MyCustomObject1,
- },
- )
- with pytest.raises(NonExistingExcelSheet):
- non_existing_sheet_name_custom.read()
- excel_data_node_as_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "sheet_name": sheet_names, "exposed_type": MyCustomObject1},
- )
- data_custom = excel_data_node_as_custom_object.read()
- assert isinstance(data_custom, Dict)
- assert len(data_custom) == 2
- assert all(len(data_custom[sheet_name]) == 5 for sheet_name in sheet_names)
- assert list(data_custom.keys()) == sheet_names
- for sheet_name in sheet_names:
- sheet_data_pandas, sheet_data_custom = data_pandas[sheet_name], data_custom[sheet_name]
- for (_, row_pandas), row_custom in zip(sheet_data_pandas.iterrows(), sheet_data_custom):
- assert isinstance(row_custom, MyCustomObject1)
- assert row_pandas["id"] == row_custom.id
- assert row_pandas["integer"] == row_custom.integer
- assert row_pandas["text"] == row_custom.text
- excel_data_node_as_custom_object_no_sheet_name = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "exposed_type": MyCustomObject1},
- )
- data_custom_no_sheet_name = excel_data_node_as_custom_object_no_sheet_name.read()
- assert isinstance(data_custom_no_sheet_name, Dict)
- assert len(data_custom_no_sheet_name) == 2
- assert data_custom.keys() == data_custom_no_sheet_name.keys()
- for sheet_name in sheet_names:
- sheet_data_custom_no_sheet_name, sheet_data_custom = (
- data_custom_no_sheet_name[sheet_name],
- data_custom[sheet_name],
- )
- for row_custom_no_sheet_name, row_custom in zip(sheet_data_custom_no_sheet_name, sheet_data_custom):
- assert isinstance(row_custom_no_sheet_name, MyCustomObject1)
- assert row_custom_no_sheet_name.id == row_custom.id
- assert row_custom_no_sheet_name.integer == row_custom.integer
- assert row_custom_no_sheet_name.text == row_custom.text
- with pytest.raises(ExposedTypeLengthMismatch):
- dn = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "sheet_name": ["Sheet1"],
- "exposed_type": [MyCustomObject1, MyCustomObject2],
- },
- )
- dn.read()
- custom_class_dict = {"Sheet1": MyCustomObject1, "Sheet2": MyCustomObject2}
- excel_data_node_as_multi_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "sheet_name": sheet_names, "exposed_type": custom_class_dict},
- )
- assert excel_data_node_as_multi_custom_object.properties["exposed_type"] == custom_class_dict
- excel_data_node_as_multi_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "sheet_name": sheet_names, "exposed_type": [MyCustomObject1, MyCustomObject2]},
- )
- assert excel_data_node_as_multi_custom_object.properties["exposed_type"] == [MyCustomObject1, MyCustomObject2]
- multi_data_custom = excel_data_node_as_multi_custom_object.read()
- assert isinstance(multi_data_custom, Dict)
- assert len(multi_data_custom) == 2
- assert all(len(multi_data_custom[sheet_name]) == 5 for sheet_name in sheet_names)
- assert list(multi_data_custom.keys()) == sheet_names
- for sheet_name, custom_class in custom_class_dict.items():
- sheet_data_pandas, sheet_data_custom = data_pandas[sheet_name], multi_data_custom[sheet_name]
- for (_, row_pandas), row_custom in zip(sheet_data_pandas.iterrows(), sheet_data_custom):
- assert isinstance(row_custom, custom_class)
- assert row_pandas["id"] == row_custom.id
- assert row_pandas["integer"] == row_custom.integer
- assert row_pandas["text"] == row_custom.text
- excel_data_node_as_multi_custom_object_no_sheet_name = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "exposed_type": custom_class_dict},
- )
- assert excel_data_node_as_multi_custom_object_no_sheet_name.properties["exposed_type"] == custom_class_dict
- multi_data_custom_no_sheet_name = excel_data_node_as_multi_custom_object_no_sheet_name.read()
- assert isinstance(multi_data_custom_no_sheet_name, Dict)
- assert len(multi_data_custom_no_sheet_name) == 2
- assert multi_data_custom.keys() == multi_data_custom_no_sheet_name.keys()
- for sheet_name, custom_class in custom_class_dict.items():
- sheet_data_custom_no_sheet_name, sheet_data_custom = (
- multi_data_custom_no_sheet_name[sheet_name],
- multi_data_custom[sheet_name],
- )
- for row_custom_no_sheet_name, row_custom in zip(sheet_data_custom_no_sheet_name, sheet_data_custom):
- assert isinstance(row_custom_no_sheet_name, custom_class)
- assert row_custom_no_sheet_name.id == row_custom.id
- assert row_custom_no_sheet_name.integer == row_custom.integer
- assert row_custom_no_sheet_name.text == row_custom.text
- def test_read_multi_sheet_without_header(self):
- not_existing_excel = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": "WRONG.xlsx", "has_header": False, "sheet_name": ["sheet_name_1", "sheet_name_2"]},
- )
- with pytest.raises(NoData):
- assert not_existing_excel.read() is None
- not_existing_excel.read_or_raise()
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- sheet_names = ["Sheet1", "Sheet2"]
- # Create ExcelDataNode without exposed_type (Default is pandas.DataFrame)
- excel_data_node_as_pandas = ExcelDataNode(
- "bar", Scope.SCENARIO, properties={"path": path, "has_header": False, "sheet_name": sheet_names}
- )
- data_pandas = excel_data_node_as_pandas.read()
- assert isinstance(data_pandas, Dict)
- assert len(data_pandas) == 2
- assert all(len(data_pandas[sheet_name]) == 6 for sheet_name in sheet_names)
- assert list(data_pandas.keys()) == sheet_names
- for sheet_name in sheet_names:
- assert isinstance(data_pandas[sheet_name], pd.DataFrame)
- assert data_pandas[sheet_name].equals(pd.read_excel(path, header=None, sheet_name=sheet_name))
- excel_data_node_as_pandas_no_sheet_name = ExcelDataNode(
- "bar", Scope.SCENARIO, properties={"path": path, "has_header": False}
- )
- data_pandas_no_sheet_name = excel_data_node_as_pandas_no_sheet_name.read()
- assert isinstance(data_pandas_no_sheet_name, Dict)
- for key in data_pandas_no_sheet_name.keys():
- assert isinstance(data_pandas_no_sheet_name[key], pd.DataFrame)
- assert data_pandas[key].equals(data_pandas_no_sheet_name[key])
- # Create ExcelDataNode with numpy exposed_type
- excel_data_node_as_numpy = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "sheet_name": sheet_names, "exposed_type": "numpy"},
- )
- data_numpy = excel_data_node_as_numpy.read()
- assert isinstance(data_numpy, Dict)
- assert len(data_numpy) == 2
- assert all(
- len(data_numpy[sheet_name] == 6) and isinstance(data_numpy[sheet_name], np.ndarray)
- for sheet_name in sheet_names
- )
- assert list(data_numpy.keys()) == sheet_names
- for sheet_name in sheet_names:
- assert np.array_equal(
- data_pandas[sheet_name], pd.read_excel(path, header=None, sheet_name=sheet_name).to_numpy()
- )
- excel_data_node_as_numpy_no_sheet_name = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "exposed_type": "numpy"},
- )
- data_numpy_no_sheet_name = excel_data_node_as_numpy_no_sheet_name.read()
- assert isinstance(data_numpy_no_sheet_name, Dict)
- for key in data_numpy_no_sheet_name.keys():
- assert isinstance(data_numpy_no_sheet_name[key], np.ndarray)
- assert np.array_equal(data_numpy[key], data_numpy_no_sheet_name[key])
- # Create the same ExcelDataNode but with custom exposed_type
- non_existing_sheet_name_custom = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "has_header": False,
- "sheet_name": ["Sheet1", "xyz"],
- "exposed_type": MyCustomObject1,
- },
- )
- with pytest.raises(NonExistingExcelSheet):
- non_existing_sheet_name_custom.read()
- excel_data_node_as_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "has_header": False,
- "sheet_name": sheet_names,
- "exposed_type": MyCustomObject1,
- },
- )
- data_custom = excel_data_node_as_custom_object.read()
- assert excel_data_node_as_custom_object.exposed_type == MyCustomObject1
- assert isinstance(data_custom, Dict)
- assert len(data_custom) == 2
- assert all(len(data_custom[sheet_name]) == 6 for sheet_name in sheet_names)
- assert list(data_custom.keys()) == sheet_names
- for sheet_name in sheet_names:
- sheet_data_pandas, sheet_data_custom = data_pandas[sheet_name], data_custom[sheet_name]
- for (_, row_pandas), row_custom in zip(sheet_data_pandas.iterrows(), sheet_data_custom):
- assert isinstance(row_custom, MyCustomObject1)
- assert row_pandas[0] == row_custom.id
- assert row_pandas[1] == row_custom.integer
- assert row_pandas[2] == row_custom.text
- excel_data_node_as_custom_object_no_sheet_name = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "exposed_type": MyCustomObject1},
- )
- data_custom_no_sheet_name = excel_data_node_as_custom_object_no_sheet_name.read()
- assert isinstance(data_custom_no_sheet_name, Dict)
- assert len(data_custom_no_sheet_name) == 2
- assert data_custom.keys() == data_custom_no_sheet_name.keys()
- for sheet_name in sheet_names:
- sheet_data_custom_no_sheet_name, sheet_data_custom = (
- data_custom_no_sheet_name[sheet_name],
- data_custom[sheet_name],
- )
- for row_custom_no_sheet_name, row_custom in zip(sheet_data_custom_no_sheet_name, sheet_data_custom):
- assert isinstance(row_custom_no_sheet_name, MyCustomObject1)
- assert row_custom_no_sheet_name.id == row_custom.id
- assert row_custom_no_sheet_name.integer == row_custom.integer
- assert row_custom_no_sheet_name.text == row_custom.text
- with pytest.raises(ExposedTypeLengthMismatch):
- dn = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "sheet_name": ["Sheet1"],
- "exposed_type": [MyCustomObject1, MyCustomObject2],
- "has_header": False,
- },
- )
- dn.read()
- custom_class_dict = {"Sheet1": MyCustomObject1, "Sheet2": MyCustomObject2}
- excel_data_node_as_multi_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "sheet_name": sheet_names,
- "exposed_type": custom_class_dict,
- "has_header": False,
- },
- )
- assert excel_data_node_as_multi_custom_object.properties["exposed_type"] == custom_class_dict
- excel_data_node_as_multi_custom_object = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={
- "path": path,
- "sheet_name": sheet_names,
- "exposed_type": [MyCustomObject1, MyCustomObject2],
- "has_header": False,
- },
- )
- assert excel_data_node_as_multi_custom_object.properties["exposed_type"] == [MyCustomObject1, MyCustomObject2]
- multi_data_custom = excel_data_node_as_multi_custom_object.read()
- assert isinstance(multi_data_custom, Dict)
- assert len(multi_data_custom) == 2
- assert all(len(multi_data_custom[sheet_name]) == 6 for sheet_name in sheet_names)
- assert list(multi_data_custom.keys()) == sheet_names
- for sheet_name, custom_class in custom_class_dict.items():
- sheet_data_pandas, sheet_data_custom = data_pandas[sheet_name], multi_data_custom[sheet_name]
- for (_, row_pandas), row_custom in zip(sheet_data_pandas.iterrows(), sheet_data_custom):
- assert isinstance(row_custom, custom_class)
- assert row_pandas[0] == row_custom.id
- assert row_pandas[1] == row_custom.integer
- assert row_pandas[2] == row_custom.text
- excel_data_node_as_multi_custom_object_no_sheet_name = ExcelDataNode(
- "bar",
- Scope.SCENARIO,
- properties={"path": path, "has_header": False, "exposed_type": custom_class_dict},
- )
- multi_data_custom_no_sheet_name = excel_data_node_as_multi_custom_object_no_sheet_name.read()
- assert isinstance(multi_data_custom_no_sheet_name, Dict)
- assert len(multi_data_custom_no_sheet_name) == 2
- assert multi_data_custom.keys() == multi_data_custom_no_sheet_name.keys()
- for sheet_name, custom_class in custom_class_dict.items():
- sheet_data_custom_no_sheet_name, sheet_data_custom = (
- multi_data_custom_no_sheet_name[sheet_name],
- multi_data_custom[sheet_name],
- )
- for row_custom_no_sheet_name, row_custom in zip(sheet_data_custom_no_sheet_name, sheet_data_custom):
- assert isinstance(row_custom_no_sheet_name, custom_class)
- assert row_custom_no_sheet_name.id == row_custom.id
- assert row_custom_no_sheet_name.integer == row_custom.integer
- assert row_custom_no_sheet_name.text == row_custom.text
- @pytest.mark.parametrize(
- "content,columns",
- [
- ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}], None),
- ([[11, 22, 33], [44, 55, 66]], None),
- ([[11, 22, 33], [44, 55, 66]], ["e", "f", "g"]),
- ],
- )
- def test_write_multi_sheet(self, excel_file_with_multi_sheet, default_multi_sheet_data_frame, content, columns):
- sheet_names = ["Sheet1", "Sheet2"]
- excel_dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": excel_file_with_multi_sheet, "sheet_name": sheet_names},
- )
- for sheet_name in sheet_names:
- assert np.array_equal(excel_dn.read()[sheet_name].values, default_multi_sheet_data_frame[sheet_name].values)
- multi_sheet_content = {sheet_name: pd.DataFrame(content) for sheet_name in sheet_names}
- excel_dn.write(multi_sheet_content)
- for sheet_name in sheet_names:
- assert np.array_equal(excel_dn.read()[sheet_name].values, multi_sheet_content[sheet_name].values)
- def test_write_multi_sheet_numpy(self, excel_file_with_multi_sheet):
- sheet_names = ["Sheet1", "Sheet2"]
- excel_dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": excel_file_with_multi_sheet, "sheet_name": sheet_names, "exposed_type": "numpy"},
- )
- sheets_data = [[11, 22, 33], [44, 55, 66]]
- data = {
- sheet_name: pd.DataFrame(sheet_data).to_numpy() for sheet_name, sheet_data in zip(sheet_names, sheets_data)
- }
- excel_dn.write(data)
- read_data = excel_dn.read()
- assert all(np.array_equal(data[sheet_name], read_data[sheet_name]) for sheet_name in sheet_names)
- @pytest.mark.parametrize(
- "content",
- [
- ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
- (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
- ([[11, 22, 33], [44, 55, 66]]),
- ],
- )
- def test_append_pandas_with_sheetname(self, excel_file, default_data_frame, content):
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": excel_file, "sheet_name": "Sheet1"})
- assert_frame_equal(dn.read(), default_data_frame)
- dn.append(content)
- assert_frame_equal(
- dn.read(),
- pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
- )
- @pytest.mark.parametrize(
- "content",
- [
- ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
- (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
- ([[11, 22, 33], [44, 55, 66]]),
- ],
- )
- def test_append_pandas_without_sheetname(self, excel_file, default_data_frame, content):
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": excel_file})
- assert_frame_equal(dn.read()["Sheet1"], default_data_frame)
- dn.append(content)
- assert_frame_equal(
- dn.read()["Sheet1"],
- pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
- )
- @pytest.mark.parametrize(
- "content",
- [
- (
- {
- "Sheet1": pd.DataFrame([{"a": 11, "b": 22, "c": 33}]),
- "Sheet2": pd.DataFrame([{"a": 44, "b": 55, "c": 66}]),
- }
- ),
- (
- {
- "Sheet1": pd.DataFrame({"a": [11, 44], "b": [22, 55], "c": [33, 66]}),
- "Sheet2": pd.DataFrame([{"a": 77, "b": 88, "c": 99}]),
- }
- ),
- ({"Sheet1": np.array([[11, 22, 33], [44, 55, 66]]), "Sheet2": np.array([[77, 88, 99]])}),
- ],
- )
- def test_append_pandas_multisheet(self, excel_file_with_multi_sheet, default_multi_sheet_data_frame, content):
- dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file_with_multi_sheet, "sheet_name": ["Sheet1", "Sheet2"]}
- )
- assert_frame_equal(dn.read()["Sheet1"], default_multi_sheet_data_frame["Sheet1"])
- assert_frame_equal(dn.read()["Sheet2"], default_multi_sheet_data_frame["Sheet2"])
- dn.append(content)
- assert_frame_equal(
- dn.read()["Sheet1"],
- pd.concat(
- [default_multi_sheet_data_frame["Sheet1"], pd.DataFrame(content["Sheet1"], columns=["a", "b", "c"])]
- ).reset_index(drop=True),
- )
- assert_frame_equal(
- dn.read()["Sheet2"],
- pd.concat(
- [default_multi_sheet_data_frame["Sheet2"], pd.DataFrame(content["Sheet2"], columns=["a", "b", "c"])]
- ).reset_index(drop=True),
- )
- @pytest.mark.parametrize(
- "content",
- [
- ({"Sheet1": pd.DataFrame([{"a": 11, "b": 22, "c": 33}])}),
- (pd.DataFrame({"a": [11, 44], "b": [22, 55], "c": [33, 66]})),
- ([[11, 22, 33], [44, 55, 66]]),
- ],
- )
- def test_append_only_first_sheet_of_a_multisheet_file(
- self, excel_file_with_multi_sheet, default_multi_sheet_data_frame, content
- ):
- dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file_with_multi_sheet, "sheet_name": ["Sheet1", "Sheet2"]}
- )
- assert_frame_equal(dn.read()["Sheet1"], default_multi_sheet_data_frame["Sheet1"])
- assert_frame_equal(dn.read()["Sheet2"], default_multi_sheet_data_frame["Sheet2"])
- dn.append(content)
- appended_content = content["Sheet1"] if isinstance(content, dict) else content
- assert_frame_equal(
- dn.read()["Sheet1"],
- pd.concat(
- [default_multi_sheet_data_frame["Sheet1"], pd.DataFrame(appended_content, columns=["a", "b", "c"])]
- ).reset_index(drop=True),
- )
- assert_frame_equal(dn.read()["Sheet2"], default_multi_sheet_data_frame["Sheet2"])
- def test_filter_pandas_exposed_type_with_sheetname(self, excel_file):
- dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file, "sheet_name": "Sheet1", "exposed_type": "pandas"}
- )
- dn.write(
- [
- {"foo": 1, "bar": 1},
- {"foo": 1, "bar": 2},
- {"foo": 1},
- {"foo": 2, "bar": 2},
- {"bar": 2},
- ]
- )
- # Test datanode indexing and slicing
- assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
- assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
- assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
- # Test filter data
- filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
- filtered_by_indexing = dn[dn["foo"] == 1]
- expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
- assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
- assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
- filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
- filtered_by_indexing = dn[dn["foo"] != 1]
- expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
- assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
- assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
- filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
- filtered_by_indexing = dn[dn["bar"] == 2]
- expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
- assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
- assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
- filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
- filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
- expected_data = pd.DataFrame(
- [
- {"foo": 1.0, "bar": 1.0},
- {"foo": 1.0, "bar": 2.0},
- {"foo": 2.0, "bar": 2.0},
- {"bar": 2.0},
- ]
- )
- assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
- assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
- def test_filter_pandas_exposed_type_without_sheetname(self, excel_file):
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": excel_file, "exposed_type": "pandas"})
- dn.write(
- [
- {"foo": 1, "bar": 1},
- {"foo": 1, "bar": 2},
- {"foo": 1},
- {"foo": 2, "bar": 2},
- {"bar": 2},
- ]
- )
- assert len(dn.filter(("foo", 1, Operator.EQUAL))["Sheet1"]) == 3
- assert len(dn.filter(("foo", 1, Operator.NOT_EQUAL))["Sheet1"]) == 2
- assert len(dn.filter(("bar", 2, Operator.EQUAL))["Sheet1"]) == 3
- assert len(dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)["Sheet1"]) == 4
- assert dn["Sheet1"]["foo"].equals(pd.Series([1, 1, 1, 2, None]))
- assert dn["Sheet1"]["bar"].equals(pd.Series([1, 2, None, 2, 2]))
- assert dn["Sheet1"][:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
- def test_filter_pandas_exposed_type_multisheet(self, excel_file):
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": excel_file, "sheet_name": ["sheet_1", "sheet_2"], "exposed_type": "pandas"},
- )
- dn.write(
- {
- "sheet_1": pd.DataFrame(
- [
- {"foo": 1, "bar": 1},
- {"foo": 1, "bar": 2},
- {"foo": 1},
- {"foo": 2, "bar": 2},
- {"bar": 2},
- ]
- ),
- "sheet_2": pd.DataFrame(
- [
- {"foo": 1, "bar": 3},
- {"foo": 1, "bar": 4},
- {"foo": 1},
- {"foo": 2, "bar": 4},
- {"bar": 4},
- ]
- ),
- }
- )
- assert len(dn.filter(("foo", 1, Operator.EQUAL))) == 2
- assert len(dn.filter(("foo", 1, Operator.EQUAL))["sheet_1"]) == 3
- assert len(dn.filter(("foo", 1, Operator.EQUAL))["sheet_2"]) == 3
- assert len(dn.filter(("foo", 1, Operator.NOT_EQUAL))) == 2
- assert len(dn.filter(("foo", 1, Operator.NOT_EQUAL))["sheet_1"]) == 2
- assert len(dn.filter(("foo", 1, Operator.NOT_EQUAL))["sheet_2"]) == 2
- assert len(dn.filter(("bar", 2, Operator.EQUAL))) == 2
- assert len(dn.filter(("bar", 2, Operator.EQUAL))["sheet_1"]) == 3
- assert len(dn.filter(("bar", 2, Operator.EQUAL))["sheet_2"]) == 0
- assert len(dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)) == 2
- assert len(dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)["sheet_1"]) == 4
- assert len(dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)["sheet_2"]) == 0
- assert dn["sheet_1"]["foo"].equals(pd.Series([1, 1, 1, 2, None]))
- assert dn["sheet_2"]["foo"].equals(pd.Series([1, 1, 1, 2, None]))
- assert dn["sheet_1"]["bar"].equals(pd.Series([1, 2, None, 2, 2]))
- assert dn["sheet_2"]["bar"].equals(pd.Series([3, 4, None, 4, 4]))
- assert dn["sheet_1"][:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
- assert dn["sheet_2"][:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 3.0}, {"foo": 1.0, "bar": 4.0}]))
- def test_filter_numpy_exposed_type_with_sheetname(self, excel_file):
- dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"path": excel_file, "sheet_name": "Sheet1", "exposed_type": "numpy"}
- )
- dn.write(
- [
- [1, 1],
- [1, 2],
- [1, 3],
- [2, 1],
- [2, 2],
- [2, 3],
- ]
- )
- # Test datanode indexing and slicing
- assert np.array_equal(dn[0], np.array([1, 1]))
- assert np.array_equal(dn[1], np.array([1, 2]))
- assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
- assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
- assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
- # Test filter data
- assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
- assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
- assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
- assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
- assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
- assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
- assert np.array_equal(
- dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
- np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
- )
- assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
- def test_filter_numpy_exposed_type_without_sheetname(self, excel_file):
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": excel_file, "exposed_type": "numpy"})
- dn.write(
- [
- [1, 1],
- [1, 2],
- [1, 3],
- [2, 1],
- [2, 2],
- [2, 3],
- ]
- )
- assert len(dn.filter((0, 1, Operator.EQUAL))["Sheet1"]) == 3
- assert len(dn.filter((0, 1, Operator.NOT_EQUAL))["Sheet1"]) == 3
- assert len(dn.filter((1, 2, Operator.EQUAL))["Sheet1"]) == 2
- assert len(dn.filter([(0, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR)["Sheet1"]) == 4
- assert np.array_equal(dn["Sheet1"][0], np.array([1, 1]))
- assert np.array_equal(dn["Sheet1"][1], np.array([1, 2]))
- assert np.array_equal(dn["Sheet1"][:3], np.array([[1, 1], [1, 2], [1, 3]]))
- assert np.array_equal(dn["Sheet1"][:, 0], np.array([1, 1, 1, 2, 2, 2]))
- assert np.array_equal(dn["Sheet1"][1:4, :1], np.array([[1], [1], [2]]))
- def test_filter_numpy_exposed_type_multisheet(self, excel_file):
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"path": excel_file, "sheet_name": ["sheet_1", "sheet_2"], "exposed_type": "numpy"},
- )
- dn.write(
- {
- "sheet_1": pd.DataFrame(
- [
- [1, 1],
- [1, 2],
- [1, 3],
- [2, 1],
- [2, 2],
- [2, 3],
- ]
- ),
- "sheet_2": pd.DataFrame(
- [
- [1, 4],
- [1, 5],
- [1, 6],
- [2, 4],
- [2, 5],
- [2, 6],
- ]
- ),
- }
- )
- assert len(dn.filter((0, 1, Operator.EQUAL))) == 2
- assert len(dn.filter((0, 1, Operator.EQUAL))["sheet_1"]) == 3
- assert len(dn.filter((0, 1, Operator.EQUAL))["sheet_2"]) == 3
- assert len(dn.filter((0, 1, Operator.NOT_EQUAL))) == 2
- assert len(dn.filter((0, 1, Operator.NOT_EQUAL))["sheet_1"]) == 3
- assert len(dn.filter((0, 1, Operator.NOT_EQUAL))["sheet_2"]) == 3
- assert len(dn.filter((1, 2, Operator.EQUAL))) == 2
- assert len(dn.filter((1, 2, Operator.EQUAL))["sheet_1"]) == 2
- assert len(dn.filter((1, 2, Operator.EQUAL))["sheet_2"]) == 0
- assert len(dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR)) == 2
- assert len(dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR)["sheet_1"]) == 4
- assert len(dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR)["sheet_2"]) == 0
- assert np.array_equal(dn["sheet_1"][0], np.array([1, 1]))
- assert np.array_equal(dn["sheet_2"][0], np.array([1, 4]))
- assert np.array_equal(dn["sheet_1"][1], np.array([1, 2]))
- assert np.array_equal(dn["sheet_2"][1], np.array([1, 5]))
- assert np.array_equal(dn["sheet_1"][:3], np.array([[1, 1], [1, 2], [1, 3]]))
- assert np.array_equal(dn["sheet_2"][:3], np.array([[1, 4], [1, 5], [1, 6]]))
- assert np.array_equal(dn["sheet_1"][:, 0], np.array([1, 1, 1, 2, 2, 2]))
- assert np.array_equal(dn["sheet_2"][:, 1], np.array([4, 5, 6, 4, 5, 6]))
- assert np.array_equal(dn["sheet_1"][1:4, :1], np.array([[1], [1], [2]]))
- assert np.array_equal(dn["sheet_2"][1:4, 1:2], np.array([[5], [6], [4]]))
- def test_set_path(self):
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"default_path": "foo.xlsx"})
- assert dn.path == "foo.xlsx"
- dn.path = "bar.xlsx"
- assert dn.path == "bar.xlsx"
- @pytest.mark.parametrize(
- ["properties", "exists"],
- [
- ({}, False),
- ({"default_data": {"a": ["foo", "bar"]}}, True),
- ],
- )
- def test_create_with_default_data(self, properties, exists):
- dn = ExcelDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties)
- assert os.path.exists(dn.path) is exists
- def test_read_write_after_modify_path(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.xlsx")
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"default_path": path})
- read_data = dn.read()
- assert read_data is not None
- dn.path = new_path
- with pytest.raises(FileNotFoundError):
- dn.read()
- dn.write(read_data)
- for sheet, df in dn.read().items():
- assert np.array_equal(df.values, read_data[sheet].values)
- def test_exposed_type_custom_class_after_modify_path(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx") # ["Sheet1", "Sheet2"]
- new_path = os.path.join(
- pathlib.Path(__file__).parent.resolve(), "data_sample/example_2.xlsx"
- ) # ["Sheet1", "Sheet2", "Sheet3"]
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"default_path": path, "exposed_type": MyCustomObject1})
- assert dn.exposed_type == MyCustomObject1
- dn.read()
- dn.path = new_path
- dn.read()
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"default_path": path, "exposed_type": MyCustomObject1, "sheet_name": ["Sheet4"]},
- )
- assert dn.exposed_type == MyCustomObject1
- with pytest.raises(NonExistingExcelSheet):
- dn.read()
- def test_exposed_type_dict(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx") # ["Sheet1", "Sheet2"]
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_path": path,
- "exposed_type": {
- "Sheet1": MyCustomObject1,
- "Sheet2": MyCustomObject2,
- "Sheet3": MyCustomObject1,
- },
- },
- )
- data = dn.read()
- assert isinstance(data, Dict)
- assert isinstance(data["Sheet1"][0], MyCustomObject1)
- assert isinstance(data["Sheet2"][0], MyCustomObject2)
- def test_exposed_type_list(self):
- path_1 = os.path.join(
- pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx"
- ) # ["Sheet1", "Sheet2"]
- path_2 = os.path.join(
- pathlib.Path(__file__).parent.resolve(), "data_sample/example_2.xlsx"
- ) # ["Sheet1", "Sheet2", "Sheet3"]
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"default_path": path_1, "exposed_type": [MyCustomObject1, MyCustomObject2]},
- )
- data = dn.read()
- assert isinstance(data, Dict)
- assert isinstance(data["Sheet1"][0], MyCustomObject1)
- assert isinstance(data["Sheet2"][0], MyCustomObject2)
- dn.path = path_2
- with pytest.raises(ExposedTypeLengthMismatch):
- dn.read()
- def test_not_trying_to_read_sheet_names_when_exposed_type_is_set(self):
- dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"default_path": "notexistyet.xlsx", "exposed_type": MyCustomObject1}
- )
- assert dn.path == "notexistyet.xlsx"
- assert dn.exposed_type == MyCustomObject1
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"default_path": "notexistyet.xlsx", "exposed_type": [MyCustomObject1, MyCustomObject2]},
- )
- assert dn.path == "notexistyet.xlsx"
- assert dn.exposed_type == [MyCustomObject1, MyCustomObject2]
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_path": "notexistyet.xlsx",
- "exposed_type": {"Sheet1": MyCustomObject1, "Sheet2": MyCustomObject2},
- },
- )
- assert dn.path == "notexistyet.xlsx"
- assert dn.exposed_type == {"Sheet1": MyCustomObject1, "Sheet2": MyCustomObject2}
- def test_exposed_type_default(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"default_path": path, "sheet_name": "Sheet1"})
- assert dn.exposed_type == "pandas"
- data = dn.read()
- assert isinstance(data, pd.DataFrame)
- def test_pandas_exposed_type(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- dn = ExcelDataNode(
- "foo", Scope.SCENARIO, properties={"default_path": path, "exposed_type": "pandas", "sheet_name": "Sheet1"}
- )
- assert dn.exposed_type == "pandas"
- data = dn.read()
- assert isinstance(data, pd.DataFrame)
- def test_complex_exposed_type_dict(self):
- # ["Sheet1", "Sheet2", "Sheet3", "Sheet4", "Sheet5"]
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example_4.xlsx")
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_path": path,
- "exposed_type": {
- "Sheet1": MyCustomObject1,
- "Sheet2": "numpy",
- "Sheet3": "pandas",
- },
- "sheet_name": ["Sheet1", "Sheet2", "Sheet3", "Sheet4"],
- },
- )
- data = dn.read()
- assert isinstance(data, dict)
- assert isinstance(data["Sheet1"], list)
- assert isinstance(data["Sheet1"][0], MyCustomObject1)
- assert isinstance(data["Sheet2"], np.ndarray)
- assert isinstance(data["Sheet3"], pd.DataFrame)
- assert isinstance(data["Sheet4"], pd.DataFrame)
- assert data.get("Sheet5") is None
- def test_complex_exposed_type_list(self):
- # ["Sheet1", "Sheet2", "Sheet3", "Sheet4","Sheet5"]
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example_4.xlsx")
- dn = ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_path": path,
- "exposed_type": [MyCustomObject1, "numpy", "pandas"],
- "sheet_name": ["Sheet1", "Sheet2", "Sheet3"],
- },
- )
- data = dn.read()
- assert isinstance(data, dict)
- assert isinstance(data["Sheet1"], list)
- assert isinstance(data["Sheet1"][0], MyCustomObject1)
- assert isinstance(data["Sheet2"], np.ndarray)
- assert isinstance(data["Sheet3"], pd.DataFrame)
- def test_invalid_exposed_type(self):
- path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
- with pytest.raises(InvalidExposedType):
- ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={"default_path": path, "exposed_type": "invalid", "sheet_name": "Sheet1"},
- )
- with pytest.raises(InvalidExposedType):
- ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_path": path,
- "exposed_type": ["numpy", "invalid", "pandas"],
- "sheet_name": "Sheet1",
- },
- )
- with pytest.raises(InvalidExposedType):
- ExcelDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "default_path": path,
- "exposed_type": {"Sheet1": "pandas", "Sheet2": "invalid"},
- "sheet_name": "Sheet1",
- },
- )
- def test_get_system_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
- temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.xlsx"))
- pd.DataFrame([]).to_excel(temp_file_path)
- dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
- dn.write(pd.DataFrame([1, 2, 3]))
- previous_edit_date = dn.last_edit_date
- sleep(0.1)
- pd.DataFrame([4, 5, 6]).to_excel(temp_file_path)
- new_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
- assert previous_edit_date < dn.last_edit_date
- assert new_edit_date == dn.last_edit_date
- sleep(0.1)
- dn.write(pd.DataFrame([7, 8, 9]))
- assert new_edit_date < dn.last_edit_date
- os.unlink(temp_file_path)
|