_submittable_status_cache.py 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from collections import defaultdict
  12. from typing import TYPE_CHECKING, Dict, Set
  13. from ..notification import EventOperation, Notifier, _make_event
  14. if TYPE_CHECKING:
  15. from ..data.data_node import DataNode, DataNodeId
  16. class SubmittableStatusCache:
  17. IS_SUBMITTABLE_FCT_NAME = "is_submittable"
  18. # A dictionary representing not ready_to_read datanode and its equivalent submittable entities
  19. _datanode_id_submittables: Dict["DataNodeId", Set[str]] = defaultdict(lambda: set())
  20. # A nested dictionary representing the submittable entities (Scenario, Sequence, Task) and
  21. # its equivalent not ready_to_read input datanodes
  22. _submittable_id_datanodes: Dict[str, Dict["DataNodeId", str]] = defaultdict(defaultdict)
  23. @classmethod
  24. def __add(cls, entity_id: str, datanode_id: "DataNodeId", reason: str):
  25. cls._datanode_id_submittables[datanode_id].add(entity_id)
  26. cls._submittable_id_datanodes[entity_id][datanode_id] = reason
  27. @classmethod
  28. def __remove(cls, datanode_id: "DataNodeId"):
  29. from ..taipy import get as tp_get
  30. submittable_ids: Set = cls._datanode_id_submittables.pop(datanode_id, set())
  31. for submittable_id in submittable_ids:
  32. cls._submittable_id_datanodes[submittable_id].pop(datanode_id, None)
  33. if len(cls._submittable_id_datanodes[submittable_id]) == 0:
  34. Notifier.publish(
  35. _make_event(
  36. tp_get(submittable_id),
  37. EventOperation.UPDATE,
  38. attribute_name=cls.IS_SUBMITTABLE_FCT_NAME,
  39. attribute_value=True,
  40. )
  41. )
  42. cls._submittable_id_datanodes.pop(submittable_id, None)
  43. @classmethod
  44. def _check_submittable_is_ready_to_submit(cls, entity_id: str):
  45. return len(SubmittableStatusCache._submittable_id_datanodes.get(entity_id, [])) == 0
  46. @classmethod
  47. def __add_parent_entities_to_submittable_cache(cls, dn: "DataNode", reason: str):
  48. from ..scenario.scenario import Scenario
  49. from ..sequence.sequence import Sequence
  50. from ..task.task import Task
  51. parent_entities = dn.get_parents()
  52. for scenario_parent in parent_entities.get(Scenario._MANAGER_NAME, []):
  53. SubmittableStatusCache.__add(scenario_parent.id, dn.id, reason)
  54. for sequence_parent in parent_entities.get(Sequence._MANAGER_NAME, []):
  55. SubmittableStatusCache.__add(sequence_parent.id, dn.id, reason)
  56. for task_parent in parent_entities.get(Task._MANAGER_NAME, []):
  57. SubmittableStatusCache.__add(task_parent.id, dn.id, reason)
  58. @classmethod
  59. def _compute_if_dn_is_ready_for_reading(cls, dn: "DataNode"):
  60. if dn._edit_in_progress:
  61. cls.__add_parent_entities_to_submittable_cache(dn, f"DataNode {dn.id} is being edited")
  62. elif not dn._last_edit_date:
  63. cls.__add_parent_entities_to_submittable_cache(dn, f"DataNode {dn.id} is not written")
  64. elif dn.is_ready_for_reading:
  65. SubmittableStatusCache.__remove(dn.id)
  66. @classmethod
  67. def get_reason_submittable_is_not_ready_to_submit(cls, entity_id: str):
  68. if reasons := list(cls._submittable_id_datanodes.get(entity_id, {}).values()):
  69. return "; ".join(reasons) + "."
  70. return ""