123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import pytest
- from taipy.core.common.frequency import Frequency
- from taipy.core.exceptions.exceptions import InvalidEventAttributeName, InvalidEventOperation
- from taipy.core.notification.event import Event, EventEntityType, EventOperation, _make_event
- from taipy.core.submission.submission import Submission
- def test_event_creation_cycle():
- event_1 = Event(
- entity_type=EventEntityType.CYCLE,
- operation=EventOperation.CREATION,
- entity_id="cycle_id",
- )
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.CYCLE
- assert event_1.entity_id == "cycle_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- event_2 = Event(
- entity_type=EventEntityType.CYCLE,
- operation=EventOperation.UPDATE,
- entity_id="cycle_id",
- attribute_name="frequency",
- attribute_value=Frequency.DAILY,
- )
- assert event_2.creation_date is not None
- assert event_2.entity_type == EventEntityType.CYCLE
- assert event_2.entity_id == "cycle_id"
- assert event_2.operation == EventOperation.UPDATE
- assert event_2.attribute_name == "frequency"
- event_3 = Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.DELETION)
- assert event_3.creation_date is not None
- assert event_3.entity_type == EventEntityType.CYCLE
- assert event_3.entity_id == "cycle_id"
- assert event_3.operation == EventOperation.DELETION
- assert event_3.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.CYCLE,
- operation=EventOperation.CREATION,
- entity_id="cycle_id",
- attribute_name="frequency",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(EventEntityType.CYCLE, EventOperation.DELETION, entity_id="cycle_id", attribute_name="frequency")
- with pytest.raises(InvalidEventOperation):
- _ = Event(
- entity_type=EventEntityType.CYCLE,
- operation=EventOperation.SUBMISSION,
- entity_id="cycle_id",
- )
- with pytest.raises(InvalidEventOperation):
- _ = Event(
- entity_type=EventEntityType.CYCLE,
- operation=EventOperation.SUBMISSION,
- entity_id="cycle_id",
- attribute_name="frequency",
- )
- def test_event_creation_scenario():
- event_1 = Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.CREATION)
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.SCENARIO
- assert event_1.entity_id == "scenario_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- event_2 = Event(
- entity_type=EventEntityType.SCENARIO,
- entity_id="scenario_id",
- operation=EventOperation.UPDATE,
- attribute_name="is_primary",
- attribute_value=True,
- )
- assert event_2.creation_date is not None
- assert event_2.entity_type == EventEntityType.SCENARIO
- assert event_2.entity_id == "scenario_id"
- assert event_2.operation == EventOperation.UPDATE
- assert event_2.attribute_name == "is_primary"
- assert event_2.attribute_value is True
- event_3 = Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.DELETION)
- assert event_3.creation_date is not None
- assert event_3.entity_type == EventEntityType.SCENARIO
- assert event_3.entity_id == "scenario_id"
- assert event_3.operation == EventOperation.DELETION
- assert event_3.attribute_name is None
- event_4 = Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION)
- assert event_4.creation_date is not None
- assert event_4.entity_type == EventEntityType.SCENARIO
- assert event_4.entity_id == "scenario_id"
- assert event_4.operation == EventOperation.SUBMISSION
- assert event_4.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SCENARIO,
- entity_id="scenario_id",
- operation=EventOperation.CREATION,
- attribute_name="is_primary",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SCENARIO,
- entity_id="scenario_id",
- operation=EventOperation.DELETION,
- attribute_name="is_primary",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SCENARIO,
- entity_id="scenario_id",
- operation=EventOperation.SUBMISSION,
- attribute_name="is_primary",
- )
- def test_event_creation_sequence():
- event_1 = Event(entity_type=EventEntityType.SEQUENCE, entity_id="sequence_id", operation=EventOperation.CREATION)
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.SEQUENCE
- assert event_1.entity_id == "sequence_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- event_2 = Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="subscribers",
- attribute_value=object(),
- )
- assert event_2.creation_date is not None
- assert event_2.entity_type == EventEntityType.SEQUENCE
- assert event_2.entity_id == "sequence_id"
- assert event_2.operation == EventOperation.UPDATE
- assert event_2.attribute_name == "subscribers"
- event_3 = Event(entity_type=EventEntityType.SEQUENCE, entity_id="sequence_id", operation=EventOperation.DELETION)
- assert event_3.creation_date is not None
- assert event_3.entity_type == EventEntityType.SEQUENCE
- assert event_3.entity_id == "sequence_id"
- assert event_3.operation == EventOperation.DELETION
- assert event_3.attribute_name is None
- event_4 = Event(entity_type=EventEntityType.SEQUENCE, entity_id="sequence_id", operation=EventOperation.SUBMISSION)
- assert event_4.creation_date is not None
- assert event_4.entity_type == EventEntityType.SEQUENCE
- assert event_4.entity_id == "sequence_id"
- assert event_4.operation == EventOperation.SUBMISSION
- assert event_4.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.CREATION,
- attribute_name="subscribers",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.DELETION,
- attribute_name="subscribers",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.SUBMISSION,
- attribute_name="subscribers",
- )
- def test_event_creation_task():
- event_1 = Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.CREATION)
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.TASK
- assert event_1.entity_id == "task_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- event_2 = Event(
- entity_type=EventEntityType.TASK,
- entity_id="task_id",
- operation=EventOperation.UPDATE,
- attribute_name="function",
- )
- assert event_2.creation_date is not None
- assert event_2.entity_type == EventEntityType.TASK
- assert event_2.entity_id == "task_id"
- assert event_2.operation == EventOperation.UPDATE
- assert event_2.attribute_name == "function"
- event_3 = Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION)
- assert event_3.creation_date is not None
- assert event_3.entity_type == EventEntityType.TASK
- assert event_3.entity_id == "task_id"
- assert event_3.operation == EventOperation.DELETION
- assert event_3.attribute_name is None
- event_4 = Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.SUBMISSION)
- assert event_4.creation_date is not None
- assert event_4.entity_type == EventEntityType.TASK
- assert event_4.entity_id == "task_id"
- assert event_4.operation == EventOperation.SUBMISSION
- assert event_4.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.TASK,
- entity_id="task_id",
- operation=EventOperation.CREATION,
- attribute_name="function",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.TASK,
- entity_id="task_id",
- operation=EventOperation.DELETION,
- attribute_name="function",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.TASK,
- entity_id="task_id",
- operation=EventOperation.SUBMISSION,
- attribute_name="function",
- )
- def test_event_creation_datanode():
- event_1 = Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION)
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.DATA_NODE
- assert event_1.entity_id == "dn_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- event_2 = Event(
- entity_type=EventEntityType.DATA_NODE,
- entity_id="dn_id",
- operation=EventOperation.UPDATE,
- attribute_name="properties",
- )
- assert event_2.creation_date is not None
- assert event_2.entity_type == EventEntityType.DATA_NODE
- assert event_2.entity_id == "dn_id"
- assert event_2.operation == EventOperation.UPDATE
- assert event_2.attribute_name == "properties"
- event_3 = Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.DELETION)
- assert event_3.creation_date is not None
- assert event_3.entity_type == EventEntityType.DATA_NODE
- assert event_3.entity_id == "dn_id"
- assert event_3.operation == EventOperation.DELETION
- assert event_3.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.DATA_NODE,
- entity_id="dn_id",
- operation=EventOperation.CREATION,
- attribute_name="properties",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.DATA_NODE,
- entity_id="dn_id",
- operation=EventOperation.DELETION,
- attribute_name="properties",
- )
- with pytest.raises(InvalidEventOperation):
- _ = Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.SUBMISSION)
- with pytest.raises(InvalidEventOperation):
- _ = Event(
- entity_type=EventEntityType.DATA_NODE,
- entity_id="dn_id",
- operation=EventOperation.SUBMISSION,
- attribute_name="properties",
- )
- def test_event_creation_job():
- event_1 = Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.CREATION)
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.JOB
- assert event_1.entity_id == "job_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- event_2 = Event(
- entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.UPDATE, attribute_name="force"
- )
- assert event_2.creation_date is not None
- assert event_2.entity_type == EventEntityType.JOB
- assert event_2.entity_id == "job_id"
- assert event_2.operation == EventOperation.UPDATE
- assert event_2.attribute_name == "force"
- event_3 = Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION)
- assert event_3.creation_date is not None
- assert event_3.entity_type == EventEntityType.JOB
- assert event_3.entity_id == "job_id"
- assert event_3.operation == EventOperation.DELETION
- assert event_3.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.JOB,
- entity_id="job_id",
- operation=EventOperation.CREATION,
- attribute_name="force",
- )
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.JOB,
- entity_id="job_id",
- operation=EventOperation.DELETION,
- attribute_name="force",
- )
- with pytest.raises(InvalidEventOperation):
- _ = Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.SUBMISSION)
- with pytest.raises(InvalidEventOperation):
- _ = Event(
- entity_type=EventEntityType.JOB,
- entity_id="job_id",
- operation=EventOperation.SUBMISSION,
- attribute_name="force",
- )
- def test_event_creation_submission():
- event_1 = Event(
- entity_type=EventEntityType.SUBMISSION, entity_id="submission_id", operation=EventOperation.CREATION
- )
- assert event_1.creation_date is not None
- assert event_1.entity_type == EventEntityType.SUBMISSION
- assert event_1.entity_id == "submission_id"
- assert event_1.operation == EventOperation.CREATION
- assert event_1.attribute_name is None
- with pytest.raises(InvalidEventAttributeName):
- _ = Event(
- entity_type=EventEntityType.SUBMISSION,
- entity_id="submission_id",
- operation=EventOperation.DELETION,
- attribute_name="force",
- )
- with pytest.raises(InvalidEventOperation):
- _ = Event(
- entity_type=EventEntityType.SUBMISSION, entity_id="submission_id", operation=EventOperation.SUBMISSION
- )
- def test_make_event_from_submission():
- submission = Submission("submission_id", entity_type="task", entity_config_id="task_config_id_1")
- event = _make_event(submission, EventOperation.CREATION)
- assert event.operation == EventOperation.CREATION
- assert event.entity_id
- assert event.entity_type == EventEntityType.SUBMISSION
- assert event.metadata["origin_entity_id"] == "submission_id"
- assert event.metadata["origin_entity_type"] == "task"
- assert event.metadata["origin_entity_config_id"] == "task_config_id_1"
|