test_registration.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from queue import SimpleQueue
  12. from taipy.core.notification import EventEntityType, EventOperation
  13. from taipy.core.notification._registration import _Registration
  14. from taipy.core.notification._topic import _Topic
  15. def test_create_registration():
  16. registration_0 = _Registration()
  17. assert isinstance(registration_0.registration_id, str)
  18. assert registration_0.registration_id.startswith(_Registration._ID_PREFIX)
  19. assert isinstance(registration_0.queue, SimpleQueue)
  20. assert registration_0.queue.qsize() == 0
  21. assert isinstance(registration_0.topic, _Topic)
  22. assert registration_0.topic.entity_type is None
  23. assert registration_0.topic.entity_id is None
  24. assert registration_0.topic.operation is None
  25. assert registration_0.topic.attribute_name is None
  26. registration_1 = _Registration(
  27. entity_type=EventEntityType.SCENARIO, entity_id="SCENARIO_scenario_id", operation=EventOperation.CREATION
  28. )
  29. assert isinstance(registration_1.registration_id, str)
  30. assert registration_1.registration_id.startswith(_Registration._ID_PREFIX)
  31. assert isinstance(registration_1.queue, SimpleQueue)
  32. assert registration_1.queue.qsize() == 0
  33. assert isinstance(registration_1.topic, _Topic)
  34. assert registration_1.topic.entity_type == EventEntityType.SCENARIO
  35. assert registration_1.topic.entity_id == "SCENARIO_scenario_id"
  36. assert registration_1.topic.operation == EventOperation.CREATION
  37. assert registration_1.topic.attribute_name is None
  38. registration_2 = _Registration(
  39. entity_type=EventEntityType.SEQUENCE,
  40. entity_id="SEQUENCE_scenario_id",
  41. operation=EventOperation.UPDATE,
  42. attribute_name="tasks",
  43. )
  44. assert isinstance(registration_2.registration_id, str)
  45. assert registration_2.registration_id.startswith(_Registration._ID_PREFIX)
  46. assert isinstance(registration_2.queue, SimpleQueue)
  47. assert registration_2.queue.qsize() == 0
  48. assert isinstance(registration_2.topic, _Topic)
  49. assert registration_2.topic.entity_type == EventEntityType.SEQUENCE
  50. assert registration_2.topic.entity_id == "SEQUENCE_scenario_id"
  51. assert registration_2.topic.operation == EventOperation.UPDATE
  52. assert registration_2.topic.attribute_name == "tasks"