Toan Quach 1 рік тому
батько
коміт
f3417507ad

+ 6 - 6
src/taipy/core/_orchestrator/_orchestrator.py

@@ -10,7 +10,6 @@
 # specific language governing permissions and limitations under the License.
 
 import itertools
-import uuid
 from datetime import datetime
 from multiprocessing import Lock
 from queue import Queue
@@ -67,7 +66,10 @@ class _Orchestrator(_AbstractOrchestrator):
         Returns:
             The created Jobs.
         """
-        submission = _SubmissionManagerFactory._build_manager()._create(submittable.id)  # type: ignore
+        submission = _SubmissionManagerFactory._build_manager()._create(
+            submittable.id, submittable._ID_PREFIX  # type: ignore
+        )
+
         jobs = []
         tasks = submittable._get_sorted_tasks()
         with cls.lock:
@@ -118,7 +120,7 @@ class _Orchestrator(_AbstractOrchestrator):
         Returns:
             The created `Job^`.
         """
-        submission = _SubmissionManagerFactory._build_manager()._create(task.id)
+        submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
         submit_id = submission.id
         with cls.lock:
             job = cls._lock_dn_output_and_create_job(
@@ -222,7 +224,6 @@ class _Orchestrator(_AbstractOrchestrator):
         if job.is_completed() or job.is_skipped():
             cls.__unblock_jobs()
         elif job.is_failed():
-            print(f"\nJob {job.id} failed, abandoning subsequent jobs.\n")
             cls._fail_subsequent_jobs(job)
 
     @classmethod
@@ -295,7 +296,6 @@ class _Orchestrator(_AbstractOrchestrator):
                 cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys()))
             )
             for job in to_fail_or_abandon_jobs:
-                print(f"Abandoning job: {job.id}")
                 job.abandoned()
             to_fail_or_abandon_jobs.update([failed_job])
             cls.__remove_blocked_jobs(to_fail_or_abandon_jobs)
@@ -309,7 +309,7 @@ class _Orchestrator(_AbstractOrchestrator):
         for job in jobs:
             if job.id in _OrchestratorFactory._dispatcher._dispatched_processes.keys():  # type: ignore
                 cls.__logger.info(f"{job.id} is running and cannot be canceled.")
-            elif job.is_completed() or job.is_skipped():
+            elif job.is_completed():
                 cls.__logger.info(f"{job.id} has already been completed and cannot be canceled.")
             elif job.is_skipped():
                 cls.__logger.info(f"{job.id} has already been skipped and cannot be canceled.")

+ 2 - 0
src/taipy/core/submission/_submission_converter.py

@@ -24,6 +24,7 @@ class _SubmissionConverter(_AbstractConverter):
         return _SubmissionModel(
             id=submission.id,
             entity_id=submission._entity_id,
+            entity_type=submission.entity_type,
             job_ids=[job.id if isinstance(job, Job) else JobId(str(job)) for job in list(submission._jobs)],
             creation_date=submission._creation_date.isoformat(),
             submission_status=submission._submission_status,
@@ -34,6 +35,7 @@ class _SubmissionConverter(_AbstractConverter):
     def _model_to_entity(cls, model: _SubmissionModel) -> Submission:
         submission = Submission(
             entity_id=model.entity_id,
+            entity_type=model.entity_type,
             id=SubmissionId(model.id),
             jobs=model.job_ids,
             creation_date=datetime.fromisoformat(model.creation_date),

+ 2 - 5
src/taipy/core/submission/_submission_manager.py

@@ -35,11 +35,8 @@ class _SubmissionManager(_Manager[Submission], _VersionMixin):
         return cls._repository._load_all(filters)
 
     @classmethod
-    def _create(
-        cls,
-        entity_id: str,
-    ) -> Submission:
-        submission = Submission(entity_id=entity_id)
+    def _create(cls, entity_id: str, entity_type: str) -> Submission:
+        submission = Submission(entity_id=entity_id, entity_type=entity_type)
         cls._set(submission)
 
         Notifier.publish(_make_event(submission, EventOperation.CREATION))

+ 4 - 0
src/taipy/core/submission/_submission_model.py

@@ -28,6 +28,7 @@ class _SubmissionModel(_BaseModel):
         mapper_registry.metadata,
         Column("id", String, primary_key=True),
         Column("entity_id", String),
+        Column("entity_type", String),
         Column("job_ids", JSON),
         Column("creation_date", String),
         Column("submission_status", Enum(SubmissionStatus)),
@@ -35,6 +36,7 @@ class _SubmissionModel(_BaseModel):
     )
     id: str
     entity_id: str
+    entity_type: str
     job_ids: Union[List[JobId], List]
     creation_date: str
     submission_status: SubmissionStatus
@@ -45,6 +47,7 @@ class _SubmissionModel(_BaseModel):
         return _SubmissionModel(
             id=data["id"],
             entity_id=data["entity_id"],
+            entity_type=data["entity_type"],
             job_ids=_BaseModel._deserialize_attribute(data["job_ids"]),
             creation_date=data["creation_date"],
             submission_status=SubmissionStatus._from_repr(data["submission_status"]),
@@ -55,6 +58,7 @@ class _SubmissionModel(_BaseModel):
         return [
             self.id,
             self.entity_id,
+            self.entity_type,
             _BaseModel._serialize_attribute(self.job_ids),
             self.creation_date,
             repr(self.submission_status),

+ 6 - 0
src/taipy/core/submission/submission.py

@@ -44,6 +44,7 @@ class Submission(_Entity, _Labeled):
     def __init__(
         self,
         entity_id: str,
+        entity_type: str,
         id: Optional[str] = None,
         jobs: Optional[Union[List[Job], List[JobId]]] = None,
         creation_date: Optional[datetime] = None,
@@ -51,6 +52,7 @@ class Submission(_Entity, _Labeled):
         version: Optional[str] = None,
     ):
         self._entity_id = entity_id
+        self._entity_type = entity_type
         self.id = id or self.__new_id()
         self._jobs: Union[List[Job], List[JobId], List] = jobs or []
         self._creation_date = creation_date or datetime.now()
@@ -66,6 +68,10 @@ class Submission(_Entity, _Labeled):
     def entity_id(self) -> str:
         return self._entity_id
 
+    @property
+    def entity_type(self) -> str:
+        return self._entity_type
+
     @property
     def creation_date(self):
         return self._creation_date

+ 3 - 3
tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py

@@ -112,7 +112,7 @@ def test_can_execute_synchronous():
 
     task_id = TaskId("task_id1")
     task = Task(config_id="name", properties={}, input=[], function=print, output=[], id=task_id)
-    submission = _SubmissionManagerFactory._build_manager()._create(task_id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX)
     job_id = JobId("id1")
     job = Job(job_id, task, submission.id, task.id)
 
@@ -130,7 +130,7 @@ def test_exception_in_user_function():
     task_id = TaskId("task_id1")
     job_id = JobId("id1")
     task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id)
-    submission = _SubmissionManagerFactory._build_manager()._create(task_id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX)
     job = Job(job_id, task, submission.id, task.id)
 
     dispatcher = _OrchestratorFactory._dispatcher
@@ -151,7 +151,7 @@ def test_exception_in_writing_data():
     output._is_in_cache = False
     output.write.side_effect = ValueError()
     task = Task(config_id="name", properties={}, input=[], function=print, output=[output], id=task_id)
-    submission = _SubmissionManagerFactory._build_manager()._create(task_id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX)
     job = Job(job_id, task, submission.id, task.id)
 
     dispatcher = _OrchestratorFactory._dispatcher

+ 6 - 6
tests/core/job/test_job.py

@@ -119,7 +119,7 @@ def test_comparison(task):
 
 
 def test_status_job(task):
-    submission = _SubmissionManagerFactory._build_manager()._create(task.id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
     job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
     submission.jobs = [job]
 
@@ -150,7 +150,7 @@ def test_status_job(task):
 
 def test_notification_job(task):
     subscribe = MagicMock()
-    submission = _SubmissionManagerFactory._build_manager()._create(task.id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
     job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
     submission.jobs = [job]
 
@@ -170,7 +170,7 @@ def test_notification_job(task):
 
 def test_handle_exception_in_user_function(task_id, job_id):
     task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id)
-    submission = _SubmissionManagerFactory._build_manager()._create(task.id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
     job = Job(job_id, task, submission.id, "scenario_entity_id")
     submission.jobs = [job]
 
@@ -184,7 +184,7 @@ def test_handle_exception_in_user_function(task_id, job_id):
 def test_handle_exception_in_input_data_node(task_id, job_id):
     data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
     task = Task(config_id="name", properties={}, input=[data_node], function=print, output=[], id=task_id)
-    submission = _SubmissionManagerFactory._build_manager()._create(task.id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
     job = Job(job_id, task, submission.id, "scenario_entity_id")
     submission.jobs = [job]
 
@@ -198,7 +198,7 @@ def test_handle_exception_in_input_data_node(task_id, job_id):
 def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_id, job_id):
     data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
     task = Task(config_id="name", properties={}, input=[], function=_foo, output=[data_node], id=task_id)
-    submission = _SubmissionManagerFactory._build_manager()._create(task.id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
     job = Job(job_id, task, submission.id, "scenario_entity_id")
     submission.jobs = [job]
 
@@ -213,7 +213,7 @@ def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_
 def test_auto_set_and_reload(current_datetime, job_id):
     task_1 = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
     task_2 = Task(config_id="name_2", properties={}, function=_foo, id=TaskId("task_2"))
-    submission = _SubmissionManagerFactory._build_manager()._create(task_1.id)
+    submission = _SubmissionManagerFactory._build_manager()._create(task_1.id, task_1._ID_PREFIX)
     job_1 = Job(job_id, task_1, submission.id, "scenario_entity_id")
     submission.jobs = [job_1]
 

+ 3 - 2
tests/core/job/test_job_manager.py

@@ -28,6 +28,7 @@ from src.taipy.core.exceptions.exceptions import JobNotDeletedException
 from src.taipy.core.job._job_manager import _JobManager
 from src.taipy.core.job.job_id import JobId
 from src.taipy.core.job.status import Status
+from src.taipy.core.scenario.scenario import Scenario
 from src.taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from src.taipy.core.task._task_manager import _TaskManager
 from src.taipy.core.task.task import Task
@@ -349,8 +350,8 @@ def test_cancel_subsequent_jobs():
     task_3 = Task("task_config_3", {}, print, [dn_4], id="task_3")
 
     # Can't get tasks under 1 scenario due to partial not serializable
-    submission_1 = submission_manager._create("scenario_id")
-    submission_2 = submission_manager._create("scenario_id")
+    submission_1 = submission_manager._create("scenario_id", Scenario._ID_PREFIX)
+    submission_2 = submission_manager._create("scenario_id", Scenario._ID_PREFIX)
 
     _DataManager._set(dn_1)
     _DataManager._set(dn_2)

+ 10 - 4
tests/core/submission/test_submission.py

@@ -29,7 +29,7 @@ from src.taipy.core.task.task import Task
 
 
 def test_create_submission(scenario, job, current_datetime):
-    submission_1 = Submission(scenario.id)
+    submission_1 = Submission(scenario.id, scenario._ID_PREFIX)
 
     assert submission_1.id is not None
     assert submission_1.entity_id == scenario.id
@@ -39,7 +39,13 @@ def test_create_submission(scenario, job, current_datetime):
     assert submission_1._version is not None
 
     submission_2 = Submission(
-        scenario.id, "submission_id", [job], current_datetime, SubmissionStatus.COMPLETED, "version_id"
+        scenario.id,
+        scenario._ID_PREFIX,
+        "submission_id",
+        [job],
+        current_datetime,
+        SubmissionStatus.COMPLETED,
+        "version_id",
     )
 
     assert submission_2.id == "submission_id"
@@ -106,7 +112,7 @@ def __test_update_submission_status(job_ids, expected_submission_status):
             return_value=(mock_get_jobs(job_ids)),
         )
     ):
-        submission = Submission("submission_id")
+        submission = Submission("submission_id", "ENTITY_TYPE")
         submission._update_submission_status(None)
         assert submission.submission_status == expected_submission_status
 
@@ -263,7 +269,7 @@ def test_update_submission_status_with_wrong_case_abandoned_without_cancel_or_fa
 
 def test_auto_set_and_reload():
     task = Task(config_id="name_1", properties={}, function=print, id=TaskId("task_1"))
-    submission_1 = Submission(task.id)
+    submission_1 = Submission(task.id, task._ID_PREFIX)
     job_1 = Job("job_1", task, submission_1.id, submission_1.entity_id)
     job_2 = Job("job_2", task, submission_1.id, submission_1.entity_id)
 

+ 6 - 6
tests/core/submission/test_submission_manager.py

@@ -20,7 +20,7 @@ from src.taipy.core.task.task import Task
 
 
 def test_create_submission(scenario):
-    submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id)
+    submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id, scenario._ID_PREFIX)
 
     assert submission_1.id is not None
     assert submission_1.entity_id == scenario.id
@@ -34,7 +34,7 @@ def test_get_submission():
 
     assert submission_manager._get("random_submission_id") is None
 
-    submission_1 = submission_manager._create("entity_id")
+    submission_1 = submission_manager._create("entity_id", "ENTITY_TYPE")
     submission_2 = submission_manager._get(submission_1.id)
 
     assert submission_1.id == submission_2.id
@@ -69,22 +69,22 @@ def test_get_latest_submission():
     task_2 = Task("task_config_2", {}, print, id="task_id_2")
 
     submission_manager = _SubmissionManagerFactory._build_manager()
-    submission_1 = submission_manager._create(task_1.id)
+    submission_1 = submission_manager._create(task_1.id, task_1._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_1
     assert submission_manager._get_latest(task_2) is None
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    submission_2 = submission_manager._create(task_2.id)
+    submission_2 = submission_manager._create(task_2.id, task_2._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_1
     assert submission_manager._get_latest(task_2) == submission_2
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    submission_3 = submission_manager._create(task_1.id)
+    submission_3 = submission_manager._create(task_1.id, task_1._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_3
     assert submission_manager._get_latest(task_2) == submission_2
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    submission_4 = submission_manager._create(task_2.id)
+    submission_4 = submission_manager._create(task_2.id, task_2._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_3
     assert submission_manager._get_latest(task_2) == submission_4
 

+ 6 - 6
tests/core/submission/test_submission_manager_with_sql_repo.py

@@ -28,7 +28,7 @@ def init_managers():
 def test_create_submission(scenario, init_sql_repo):
     init_managers()
 
-    submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id)
+    submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id, scenario._ID_PREFIX)
 
     assert submission_1.id is not None
     assert submission_1.entity_id == scenario.id
@@ -42,7 +42,7 @@ def test_get_submission(init_sql_repo):
 
     submission_manager = _SubmissionManagerFactory._build_manager()
 
-    submission_1 = submission_manager._create("entity_id")
+    submission_1 = submission_manager._create("entity_id", "ENTITY_TYPE")
     submission_2 = submission_manager._get(submission_1.id)
 
     assert submission_1.id == submission_2.id
@@ -80,22 +80,22 @@ def test_get_latest_submission(init_sql_repo):
     task_2 = Task("task_config_2", {}, print, id="task_id_2")
 
     submission_manager = _SubmissionManagerFactory._build_manager()
-    submission_1 = submission_manager._create(task_1.id)
+    submission_1 = submission_manager._create(task_1.id, task_1._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_1
     assert submission_manager._get_latest(task_2) is None
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    submission_2 = submission_manager._create(task_2.id)
+    submission_2 = submission_manager._create(task_2.id, task_2._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_1
     assert submission_manager._get_latest(task_2) == submission_2
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    submission_3 = submission_manager._create(task_1.id)
+    submission_3 = submission_manager._create(task_1.id, task_1._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_3
     assert submission_manager._get_latest(task_2) == submission_2
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    submission_4 = submission_manager._create(task_2.id)
+    submission_4 = submission_manager._create(task_2.id, task_2._ID_PREFIX)
     assert submission_manager._get_latest(task_1) == submission_3
     assert submission_manager._get_latest(task_2) == submission_4
 

+ 9 - 9
tests/core/submission/test_submission_repositories.py

@@ -43,7 +43,7 @@ class TestSubmissionRepository:
         job._task = task
         _JobManagerFactory._build_manager()._repository._save(job)
 
-        submission = Submission(task.id)
+        submission = Submission(task.id, task._ID_PREFIX)
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
         submission_repository._save(submission)
         submission.jobs = [job]
@@ -55,7 +55,7 @@ class TestSubmissionRepository:
     def test_exists(self, configure_repo):
         configure_repo()
 
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
         submission_repository._save(submission)
 
@@ -67,7 +67,7 @@ class TestSubmissionRepository:
         configure_repo()
 
         repository = _SubmissionManagerFactory._build_manager()._repository
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
         for i in range(10):
             submission.id = f"submission-{i}"
             repository._save(submission)
@@ -81,7 +81,7 @@ class TestSubmissionRepository:
 
         repository = _SubmissionManagerFactory._build_manager()._repository
 
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
         repository._save(submission)
 
         repository._delete(submission.id)
@@ -94,7 +94,7 @@ class TestSubmissionRepository:
         configure_repo()
 
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
 
         for i in range(10):
             submission.id = f"submission-{i}"
@@ -110,7 +110,7 @@ class TestSubmissionRepository:
     def test_delete_many(self, configure_repo):
         configure_repo()
 
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
 
         for i in range(10):
@@ -130,7 +130,7 @@ class TestSubmissionRepository:
 
         # Create 5 entities with version 1.0 and 5 entities with version 2.0
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
 
         for i in range(10):
             submission.id = f"submission-{i}"
@@ -148,7 +148,7 @@ class TestSubmissionRepository:
         configure_repo()
 
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
-        submission = Submission("entity_id", version="random_version_number")
+        submission = Submission("entity_id", "ENTITY_TYPE", version="random_version_number")
         for i in range(10):
             submission.id = f"submission-{i}"
             submission_repository._save(submission)
@@ -170,7 +170,7 @@ class TestSubmissionRepository:
         configure_repo()
 
         repository = _SubmissionManagerFactory._build_manager()._repository
-        submission = Submission("entity_id")
+        submission = Submission("entity_id", "ENTITY_TYPE")
         repository._save(submission)
 
         repository._export(submission.id, tmpdir.strpath)