|
@@ -54,7 +54,7 @@ def reset_data():
|
|
class TestGenericDataNode:
|
|
class TestGenericDataNode:
|
|
data = list(range(10))
|
|
data = list(range(10))
|
|
|
|
|
|
- def test_create(self):
|
|
|
|
|
|
+ def test_create_with_both_read_fct_and_write_fct(self):
|
|
data_manager = _DataManagerFactory._build_manager()
|
|
data_manager = _DataManagerFactory._build_manager()
|
|
generic_dn_config = Config.configure_generic_data_node(
|
|
generic_dn_config = Config.configure_generic_data_node(
|
|
id="foo_bar", read_fct=read_fct, write_fct=write_fct, name="super name"
|
|
id="foo_bar", read_fct=read_fct, write_fct=write_fct, name="super name"
|
|
@@ -73,72 +73,76 @@ class TestGenericDataNode:
|
|
assert dn.properties["read_fct"] == read_fct
|
|
assert dn.properties["read_fct"] == read_fct
|
|
assert dn.properties["write_fct"] == write_fct
|
|
assert dn.properties["write_fct"] == write_fct
|
|
|
|
|
|
- generic_dn_config_1 = Config.configure_generic_data_node(
|
|
|
|
- id="foo", read_fct=read_fct, write_fct=None, name="foo"
|
|
|
|
- )
|
|
|
|
- dn_1 = data_manager._create_and_set(generic_dn_config_1, None, None)
|
|
|
|
- assert isinstance(dn, GenericDataNode)
|
|
|
|
- assert dn_1.storage_type() == "generic"
|
|
|
|
- assert dn_1.config_id == "foo"
|
|
|
|
- assert dn_1.name == "foo"
|
|
|
|
- assert dn_1.scope == Scope.SCENARIO
|
|
|
|
- assert dn_1.id is not None
|
|
|
|
- assert dn_1.owner_id is None
|
|
|
|
- assert dn_1.last_edit_date is not None
|
|
|
|
- assert dn_1.job_ids == []
|
|
|
|
- assert dn_1.is_ready_for_reading
|
|
|
|
- assert dn_1.properties["read_fct"] == read_fct
|
|
|
|
- assert dn_1.properties["write_fct"] is None
|
|
|
|
-
|
|
|
|
- generic_dn_config_2 = Config.configure_generic_data_node(
|
|
|
|
- id="xyz", read_fct=None, write_fct=write_fct, name="xyz"
|
|
|
|
- )
|
|
|
|
- dn_2 = data_manager._create_and_set(generic_dn_config_2, None, None)
|
|
|
|
|
|
+ with pytest.raises(InvalidConfigurationId):
|
|
|
|
+ GenericDataNode("foo bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct})
|
|
|
|
+
|
|
|
|
+ def test_create_with_read_fct_and_none_write_fct(self):
|
|
|
|
+ data_manager = _DataManagerFactory._build_manager()
|
|
|
|
+ generic_dn_config = Config.configure_generic_data_node(id="foo", read_fct=read_fct, write_fct=None, name="foo")
|
|
|
|
+ dn = data_manager._create_and_set(generic_dn_config, None, None)
|
|
assert isinstance(dn, GenericDataNode)
|
|
assert isinstance(dn, GenericDataNode)
|
|
- assert dn_2.storage_type() == "generic"
|
|
|
|
- assert dn_2.config_id == "xyz"
|
|
|
|
- assert dn_2.name == "xyz"
|
|
|
|
- assert dn_2.scope == Scope.SCENARIO
|
|
|
|
- assert dn_2.id is not None
|
|
|
|
- assert dn_2.owner_id is None
|
|
|
|
- assert dn_2.last_edit_date is not None
|
|
|
|
- assert dn_2.job_ids == []
|
|
|
|
- assert dn_2.is_ready_for_reading
|
|
|
|
- assert dn_2.properties["read_fct"] is None
|
|
|
|
- assert dn_2.properties["write_fct"] == write_fct
|
|
|
|
-
|
|
|
|
- generic_dn_config_3 = Config.configure_generic_data_node(id="acb", read_fct=read_fct, name="acb")
|
|
|
|
- dn_3 = data_manager._create_and_set(generic_dn_config_3, None, None)
|
|
|
|
|
|
+ assert dn.storage_type() == "generic"
|
|
|
|
+ assert dn.config_id == "foo"
|
|
|
|
+ assert dn.name == "foo"
|
|
|
|
+ assert dn.scope == Scope.SCENARIO
|
|
|
|
+ assert dn.id is not None
|
|
|
|
+ assert dn.owner_id is None
|
|
|
|
+ assert dn.last_edit_date is not None
|
|
|
|
+ assert dn.job_ids == []
|
|
|
|
+ assert dn.is_ready_for_reading
|
|
|
|
+ assert dn.properties["read_fct"] == read_fct
|
|
|
|
+ assert dn.properties["write_fct"] is None
|
|
|
|
+
|
|
|
|
+ def test_create_with_write_fct_and_none_read_fct(self):
|
|
|
|
+ data_manager = _DataManagerFactory._build_manager()
|
|
|
|
+ generic_dn_config = Config.configure_generic_data_node(id="xyz", read_fct=None, write_fct=write_fct, name="xyz")
|
|
|
|
+ dn = data_manager._create_and_set(generic_dn_config, None, None)
|
|
assert isinstance(dn, GenericDataNode)
|
|
assert isinstance(dn, GenericDataNode)
|
|
- assert dn_3.storage_type() == "generic"
|
|
|
|
- assert dn_3.config_id == "acb"
|
|
|
|
- assert dn_3.name == "acb"
|
|
|
|
- assert dn_3.scope == Scope.SCENARIO
|
|
|
|
- assert dn_3.id is not None
|
|
|
|
- assert dn_3.owner_id is None
|
|
|
|
- assert dn_3.last_edit_date is not None
|
|
|
|
- assert dn_3.job_ids == []
|
|
|
|
- assert dn_3.is_ready_for_reading
|
|
|
|
- assert dn_3.properties["read_fct"] == read_fct
|
|
|
|
- assert dn_3.properties["write_fct"] is None
|
|
|
|
-
|
|
|
|
- generic_dn_config_4 = Config.configure_generic_data_node(id="mno", write_fct=write_fct, name="mno")
|
|
|
|
- dn_4 = data_manager._create_and_set(generic_dn_config_4, None, None)
|
|
|
|
|
|
+ assert dn.storage_type() == "generic"
|
|
|
|
+ assert dn.config_id == "xyz"
|
|
|
|
+ assert dn.name == "xyz"
|
|
|
|
+ assert dn.scope == Scope.SCENARIO
|
|
|
|
+ assert dn.id is not None
|
|
|
|
+ assert dn.owner_id is None
|
|
|
|
+ assert dn.last_edit_date is not None
|
|
|
|
+ assert dn.job_ids == []
|
|
|
|
+ assert dn.is_ready_for_reading
|
|
|
|
+ assert dn.properties["read_fct"] is None
|
|
|
|
+ assert dn.properties["write_fct"] == write_fct
|
|
|
|
+
|
|
|
|
+ def test_create_with_read_fct(self):
|
|
|
|
+ data_manager = _DataManagerFactory._build_manager()
|
|
|
|
+ generic_dn_config = Config.configure_generic_data_node(id="acb", read_fct=read_fct, name="acb")
|
|
|
|
+ dn = data_manager._create_and_set(generic_dn_config, None, None)
|
|
assert isinstance(dn, GenericDataNode)
|
|
assert isinstance(dn, GenericDataNode)
|
|
- assert dn_4.storage_type() == "generic"
|
|
|
|
- assert dn_4.config_id == "mno"
|
|
|
|
- assert dn_4.name == "mno"
|
|
|
|
- assert dn_4.scope == Scope.SCENARIO
|
|
|
|
- assert dn_4.id is not None
|
|
|
|
- assert dn_4.owner_id is None
|
|
|
|
- assert dn_4.last_edit_date is not None
|
|
|
|
- assert dn_4.job_ids == []
|
|
|
|
- assert dn_4.is_ready_for_reading
|
|
|
|
- assert dn_4.properties["read_fct"] is None
|
|
|
|
- assert dn_4.properties["write_fct"] == write_fct
|
|
|
|
|
|
+ assert dn.storage_type() == "generic"
|
|
|
|
+ assert dn.config_id == "acb"
|
|
|
|
+ assert dn.name == "acb"
|
|
|
|
+ assert dn.scope == Scope.SCENARIO
|
|
|
|
+ assert dn.id is not None
|
|
|
|
+ assert dn.owner_id is None
|
|
|
|
+ assert dn.last_edit_date is not None
|
|
|
|
+ assert dn.job_ids == []
|
|
|
|
+ assert dn.is_ready_for_reading
|
|
|
|
+ assert dn.properties["read_fct"] == read_fct
|
|
|
|
+ assert dn.properties["write_fct"] is None
|
|
|
|
|
|
- with pytest.raises(InvalidConfigurationId):
|
|
|
|
- GenericDataNode("foo bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct})
|
|
|
|
|
|
+ def test_create_with_write_fct(self):
|
|
|
|
+ data_manager = _DataManagerFactory._build_manager()
|
|
|
|
+ generic_dn_config = Config.configure_generic_data_node(id="mno", write_fct=write_fct, name="mno")
|
|
|
|
+ dn = data_manager._create_and_set(generic_dn_config, None, None)
|
|
|
|
+ assert isinstance(dn, GenericDataNode)
|
|
|
|
+ assert dn.storage_type() == "generic"
|
|
|
|
+ assert dn.config_id == "mno"
|
|
|
|
+ assert dn.name == "mno"
|
|
|
|
+ assert dn.scope == Scope.SCENARIO
|
|
|
|
+ assert dn.id is not None
|
|
|
|
+ assert dn.owner_id is None
|
|
|
|
+ assert dn.last_edit_date is not None
|
|
|
|
+ assert dn.job_ids == []
|
|
|
|
+ assert dn.is_ready_for_reading
|
|
|
|
+ assert dn.properties["read_fct"] is None
|
|
|
|
+ assert dn.properties["write_fct"] == write_fct
|
|
|
|
|
|
def test_get_user_properties(self):
|
|
def test_get_user_properties(self):
|
|
dn_1 = GenericDataNode(
|
|
dn_1 = GenericDataNode(
|