浏览代码

make submission return type of submit function

Toan Quach 1 年之前
父节点
当前提交
f2d716fbe9
共有 31 个文件被更改,包括 422 次插入270 次删除
  1. 2 1
      taipy/core/_entity/submittable.py
  2. 7 5
      taipy/core/_orchestrator/_abstract_orchestrator.py
  3. 23 8
      taipy/core/_orchestrator/_orchestrator.py
  4. 1 1
      taipy/core/job/_job_manager.py
  5. 6 4
      taipy/core/job/job.py
  6. 13 4
      taipy/core/scenario/_scenario_manager.py
  7. 6 4
      taipy/core/scenario/scenario.py
  8. 13 4
      taipy/core/sequence/_sequence_manager.py
  9. 6 3
      taipy/core/sequence/sequence.py
  10. 2 0
      taipy/core/submission/_submission_converter.py
  11. 4 6
      taipy/core/submission/_submission_manager.py
  12. 4 0
      taipy/core/submission/_submission_model.py
  13. 15 3
      taipy/core/submission/submission.py
  14. 17 11
      taipy/core/taipy.py
  15. 7 3
      taipy/core/task/_task_manager.py
  16. 7 8
      taipy/core/task/task.py
  17. 45 85
      tests/core/_orchestrator/test_orchestrator.py
  18. 66 53
      tests/core/_orchestrator/test_orchestrator__submit.py
  19. 28 13
      tests/core/_orchestrator/test_orchestrator__submit_task.py
  20. 20 20
      tests/core/job/test_job_manager.py
  21. 12 12
      tests/core/job/test_job_manager_with_sql_repo.py
  22. 2 1
      tests/core/job/test_job_repositories.py
  23. 6 4
      tests/core/notification/test_notifier.py
  24. 76 1
      tests/core/submission/test_submission.py
  25. 8 2
      tests/core/submission/test_submission_manager.py
  26. 8 2
      tests/core/submission/test_submission_manager_with_sql_repo.py
  27. 7 1
      tests/core/submission/test_submission_repositories.py
  28. 2 2
      tests/core/test_core_cli.py
  29. 2 2
      tests/core/test_core_cli_with_sql_repo.py
  30. 3 3
      tests/core/test_taipy.py
  31. 4 4
      tests/core/version/test_production_version_migration.py

+ 2 - 1
taipy/core/_entity/submittable.py

@@ -19,6 +19,7 @@ from ..common._listattributes import _ListAttributes
 from ..common._utils import _Subscriber
 from ..data.data_node import DataNode
 from ..job.job import Job
+from ..submission.submission import Submission
 from ..task.task import Task
 from ._dag import _DAG
 
@@ -42,7 +43,7 @@ class Submittable:
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ):
+    ) -> Submission:
         raise NotImplementedError
 
     def get_inputs(self) -> Set[DataNode]:

+ 7 - 5
taipy/core/_orchestrator/_abstract_orchestrator.py

@@ -10,9 +10,11 @@
 # specific language governing permissions and limitations under the License.
 
 from abc import abstractmethod
-from typing import Callable, Iterable, List, Optional, Union
+from typing import Callable, Iterable, Optional, Union
 
+from .._entity.submittable import Submittable
 from ..job.job import Job
+from ..submission.submission import Submission
 from ..task.task import Task
 
 
@@ -28,12 +30,12 @@ class _AbstractOrchestrator:
     @abstractmethod
     def submit(
         cls,
-        sequence,
+        submittable: Submittable,
         callbacks: Optional[Iterable[Callable]],
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> List[Job]:
+    ) -> Submission:
         raise NotImplementedError
 
     @classmethod
@@ -45,10 +47,10 @@ class _AbstractOrchestrator:
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> Job:
+    ) -> Submission:
         raise NotImplementedError
 
     @classmethod
     @abstractmethod
-    def cancel_job(cls, job):
+    def cancel_job(cls, job: Job):
         raise NotImplementedError

+ 23 - 8
taipy/core/_orchestrator/_orchestrator.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from multiprocessing import Lock
 from queue import Queue
 from time import sleep
-from typing import Callable, Iterable, List, Optional, Set, Union
+from typing import Callable, Dict, Iterable, List, Optional, Set, Union
 
 from taipy.config.config import Config
 from taipy.logger._taipy_logger import _TaipyLogger
@@ -25,6 +25,7 @@ from ..job._job_manager_factory import _JobManagerFactory
 from ..job.job import Job
 from ..job.job_id import JobId
 from ..submission._submission_manager_factory import _SubmissionManagerFactory
+from ..submission.submission import Submission
 from ..task.task import Task
 from ._abstract_orchestrator import _AbstractOrchestrator
 
@@ -38,6 +39,7 @@ class _Orchestrator(_AbstractOrchestrator):
     blocked_jobs: List = []
     lock = Lock()
     __logger = _TaipyLogger._get_logger()
+    _submission_entities: Dict[str, Submission] = {}
 
     @classmethod
     def initialize(cls):
@@ -51,7 +53,8 @@ class _Orchestrator(_AbstractOrchestrator):
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> List[Job]:
+        **properties,
+    ) -> Submission:
         """Submit the given `Scenario^` or `Sequence^` for an execution.
 
         Parameters:
@@ -63,6 +66,7 @@ class _Orchestrator(_AbstractOrchestrator):
                 finished in asynchronous mode.
              timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished
                 before returning.
+             **properties (dict[str, any]): A keyworded variable length list of additional arguments.
         Returns:
             The created Jobs.
         """
@@ -70,7 +74,9 @@ class _Orchestrator(_AbstractOrchestrator):
             submittable.id,  # type: ignore
             submittable._ID_PREFIX,  # type: ignore
             getattr(submittable, "config_id", None),
+            **properties,
         )
+        cls._submission_entities[submission.id] = submission
         jobs = []
         tasks = submittable._get_sorted_tasks()
         with cls.lock:
@@ -81,7 +87,7 @@ class _Orchestrator(_AbstractOrchestrator):
                             task,
                             submission.id,
                             submission.entity_id,
-                            callbacks=itertools.chain([submission._update_submission_status], callbacks or []),
+                            callbacks=itertools.chain([cls._update_submission_status], callbacks or []),
                             force=force,  # type: ignore
                         )
                     )
@@ -92,7 +98,7 @@ class _Orchestrator(_AbstractOrchestrator):
         else:
             if wait:
                 cls._wait_until_job_finished(jobs, timeout=timeout)
-        return jobs
+        return submission
 
     @classmethod
     def submit_task(
@@ -102,7 +108,8 @@ class _Orchestrator(_AbstractOrchestrator):
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> Job:
+        **properties,
+    ) -> Submission:
         """Submit the given `Task^` for an execution.
 
         Parameters:
@@ -113,17 +120,21 @@ class _Orchestrator(_AbstractOrchestrator):
                 in asynchronous mode.
              timeout (Union[float, int]): The optional maximum number of seconds to wait for the job
                 to be finished before returning.
+             **properties (dict[str, any]): A keyworded variable length list of additional arguments.
         Returns:
             The created `Job^`.
         """
-        submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
+        submission = _SubmissionManagerFactory._build_manager()._create(
+            task.id, task._ID_PREFIX, task.config_id, **properties
+        )
         submit_id = submission.id
+        cls._submission_entities[submission.id] = submission
         with cls.lock:
             job = cls._lock_dn_output_and_create_job(
                 task,
                 submit_id,
                 submission.entity_id,
-                itertools.chain([submission._update_submission_status], callbacks or []),
+                itertools.chain([cls._update_submission_status], callbacks or []),
                 force,
             )
         jobs = [job]
@@ -134,7 +145,11 @@ class _Orchestrator(_AbstractOrchestrator):
         else:
             if wait:
                 cls._wait_until_job_finished(job, timeout=timeout)
-        return job
+        return submission
+
+    @classmethod
+    def _update_submission_status(cls, job: Job):
+        cls._submission_entities[job.submit_id]._update_submission_status(job)
 
     @classmethod
     def _lock_dn_output_and_create_job(

+ 1 - 1
taipy/core/job/_job_manager.py

@@ -50,9 +50,9 @@ class _JobManager(_Manager[Job], _VersionMixin):
             force=force,
             version=version,
         )
-        cls._set(job)
         Notifier.publish(_make_event(job, EventOperation.CREATION))
         job._on_status_change(*callbacks)
+        cls._set(job)
         return job
 
     @classmethod

+ 6 - 4
taipy/core/job/job.py

@@ -13,7 +13,7 @@ __all__ = ["Job"]
 
 import traceback
 from datetime import datetime
-from typing import Any, Callable, List, Optional
+from typing import TYPE_CHECKING, Any, Callable, List, Optional
 
 from taipy.logger._taipy_logger import _TaipyLogger
 
@@ -23,10 +23,12 @@ from .._entity._reload import _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
 from ..common._utils import _fcts_to_dict
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
-from ..task.task import Task
 from .job_id import JobId
 from .status import Status
 
+if TYPE_CHECKING:
+    from ..task.task import Task
+
 
 def _run_callbacks(fn):
     def __run_callbacks(job):
@@ -58,7 +60,7 @@ class Job(_Entity, _Labeled):
     _MANAGER_NAME = "job"
     _ID_PREFIX = "JOB"
 
-    def __init__(self, id: JobId, task: Task, submit_id: str, submit_entity_id: str, force=False, version=None):
+    def __init__(self, id: JobId, task: "Task", submit_id: str, submit_entity_id: str, force=False, version=None):
         self.id = id
         self._task = task
         self._force = force
@@ -146,7 +148,7 @@ class Job(_Entity, _Labeled):
     def version(self):
         return self._version
 
-    def __contains__(self, task: Task):
+    def __contains__(self, task: "Task"):
         return self.task.id == task.id
 
     def __lt__(self, other):

+ 13 - 4
taipy/core/scenario/_scenario_manager.py

@@ -41,6 +41,7 @@ from ..job._job_manager_factory import _JobManagerFactory
 from ..job.job import Job
 from ..notification import EventEntityType, EventOperation, Notifier, _make_event
 from ..submission._submission_manager_factory import _SubmissionManagerFactory
+from ..submission.submission import Submission
 from ..task._task_manager_factory import _TaskManagerFactory
 from .scenario import Scenario
 from .scenario_id import ScenarioId
@@ -205,7 +206,8 @@ class _ScenarioManager(_Manager[Scenario], _VersionMixin):
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
         check_inputs_are_ready: bool = True,
-    ) -> List[Job]:
+        **properties,
+    ) -> Submission:
         scenario_id = scenario.id if isinstance(scenario, Scenario) else scenario
         scenario = cls._get(scenario_id)
         if scenario is None:
@@ -215,13 +217,20 @@ class _ScenarioManager(_Manager[Scenario], _VersionMixin):
         if check_inputs_are_ready:
             _warn_if_inputs_not_ready(scenario.get_inputs())
 
-        jobs = (
+        submission = (
             _TaskManagerFactory._build_manager()
             ._orchestrator()
-            .submit(scenario, callbacks=scenario_subscription_callback, force=force, wait=wait, timeout=timeout)
+            .submit(
+                scenario,
+                callbacks=scenario_subscription_callback,
+                force=force,
+                wait=wait,
+                timeout=timeout,
+                **properties,
+            )
         )
         Notifier.publish(_make_event(scenario, EventOperation.SUBMISSION))
-        return jobs
+        return submission
 
     @classmethod
     def __get_status_notifier_callbacks(cls, scenario: Scenario) -> List:

+ 6 - 4
taipy/core/scenario/scenario.py

@@ -42,6 +42,7 @@ from ..exceptions.exceptions import (
 from ..job.job import Job
 from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
 from ..sequence.sequence import Sequence
+from ..submission.submission import Submission
 from ..task.task import Task
 from ..task.task_id import TaskId
 from .scenario_id import ScenarioId
@@ -492,7 +493,8 @@ class Scenario(_Entity, Submittable, _Labeled):
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> List[Job]:
+        **properties,
+    ) -> Submission:
         """Submit this scenario for execution.
 
         All the `Task^`s of the scenario will be submitted for execution.
@@ -505,13 +507,13 @@ class Scenario(_Entity, Submittable, _Labeled):
                 asynchronous mode.
             timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished
                 before returning.
-
+            **properties (dict[str, any]): A keyworded variable length list of additional arguments.
         Returns:
-            A list of created `Job^`s.
+            A `Submission^` containing the information of the submission.
         """
         from ._scenario_manager_factory import _ScenarioManagerFactory
 
-        return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout)
+        return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
 
     def export(
         self,

+ 13 - 4
taipy/core/sequence/_sequence_manager.py

@@ -34,6 +34,7 @@ from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
 from ..scenario.scenario import Scenario
 from ..scenario.scenario_id import ScenarioId
 from ..submission._submission_manager_factory import _SubmissionManagerFactory
+from ..submission.submission import Submission
 from ..task._task_manager_factory import _TaskManagerFactory
 from ..task.task import Task, TaskId
 from .sequence import Sequence
@@ -309,7 +310,8 @@ class _SequenceManager(_Manager[Sequence], _VersionMixin):
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
         check_inputs_are_ready: bool = True,
-    ) -> List[Job]:
+        **properties,
+    ) -> Submission:
         sequence_id = sequence.id if isinstance(sequence, Sequence) else sequence
         sequence = cls._get(sequence_id)
         if sequence is None:
@@ -319,13 +321,20 @@ class _SequenceManager(_Manager[Sequence], _VersionMixin):
         if check_inputs_are_ready:
             _warn_if_inputs_not_ready(sequence.get_inputs())
 
-        jobs = (
+        submission = (
             _TaskManagerFactory._build_manager()
             ._orchestrator()
-            .submit(sequence, callbacks=sequence_subscription_callback, force=force, wait=wait, timeout=timeout)
+            .submit(
+                sequence,
+                callbacks=sequence_subscription_callback,
+                force=force,
+                wait=wait,
+                timeout=timeout,
+                **properties,
+            )
         )
         Notifier.publish(_make_event(sequence, EventOperation.SUBMISSION))
-        return jobs
+        return submission
 
     @classmethod
     def _exists(cls, entity_id: str) -> bool:

+ 6 - 3
taipy/core/sequence/sequence.py

@@ -30,6 +30,7 @@ from ..data.data_node import DataNode
 from ..exceptions.exceptions import NonExistingTask
 from ..job.job import Job
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
+from ..submission.submission import Submission
 from ..task.task import Task
 from ..task.task_id import TaskId
 from .sequence_id import SequenceId
@@ -225,7 +226,8 @@ class Sequence(_Entity, Submittable, _Labeled):
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> List[Job]:
+        **properties,
+    ) -> Submission:
         """Submit the sequence for execution.
 
         All the `Task^`s of the sequence will be submitted for execution.
@@ -238,12 +240,13 @@ class Sequence(_Entity, Submittable, _Labeled):
                 in asynchronous mode.
             timeout (Union[float, int]): The maximum number of seconds to wait for the jobs to be finished before
                 returning.
+            **properties (dict[str, any]): A keyworded variable length list of additional arguments.
         Returns:
-            A list of created `Job^`s.
+            A `Submission^` containing the information of the submission.
         """
         from ._sequence_manager_factory import _SequenceManagerFactory
 
-        return _SequenceManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout)
+        return _SequenceManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
 
     def get_label(self) -> str:
         """Returns the sequence simple label prefixed by its owner label.

+ 2 - 0
taipy/core/submission/_submission_converter.py

@@ -27,6 +27,7 @@ class _SubmissionConverter(_AbstractConverter):
             entity_type=submission.entity_type,
             entity_config_id=submission._entity_config_id,
             job_ids=[job.id if isinstance(job, Job) else JobId(str(job)) for job in list(submission._jobs)],
+            properties=submission._properties.data.copy(),
             creation_date=submission._creation_date.isoformat(),
             submission_status=submission._submission_status,
             version=submission._version,
@@ -40,6 +41,7 @@ class _SubmissionConverter(_AbstractConverter):
             entity_config_id=model.entity_config_id,
             id=SubmissionId(model.id),
             jobs=model.job_ids,
+            properties=model.properties,
             creation_date=datetime.fromisoformat(model.creation_date),
             submission_status=model.submission_status,
             version=model.version,

+ 4 - 6
taipy/core/submission/_submission_manager.py

@@ -37,8 +37,10 @@ class _SubmissionManager(_Manager[Submission], _VersionMixin):
         return cls._repository._load_all(filters)
 
     @classmethod
-    def _create(cls, entity_id: str, entity_type: str, entity_config: Optional[str]) -> Submission:
-        submission = Submission(entity_id=entity_id, entity_type=entity_type, entity_config_id=entity_config)
+    def _create(cls, entity_id: str, entity_type: str, entity_config: Optional[str], **properties) -> Submission:
+        submission = Submission(
+            entity_id=entity_id, entity_type=entity_type, entity_config_id=entity_config, properties=properties
+        )
         cls._set(submission)
 
         Notifier.publish(_make_event(submission, EventOperation.CREATION))
@@ -56,10 +58,6 @@ class _SubmissionManager(_Manager[Submission], _VersionMixin):
         else:
             return max(submissions_of_task)
 
-    @classmethod
-    def _is_editable(cls, entity: Union[Submission, str]) -> bool:
-        return False
-
     @classmethod
     def _delete(cls, submission: Union[Submission, SubmissionId]):
         if isinstance(submission, str):

+ 4 - 0
taipy/core/submission/_submission_model.py

@@ -31,6 +31,7 @@ class _SubmissionModel(_BaseModel):
         Column("entity_type", String),
         Column("entity_config_id", String),
         Column("job_ids", JSON),
+        Column("properties", JSON),
         Column("creation_date", String),
         Column("submission_status", Enum(SubmissionStatus)),
         Column("version", String),
@@ -40,6 +41,7 @@ class _SubmissionModel(_BaseModel):
     entity_type: str
     entity_config_id: Optional[str]
     job_ids: Union[List[JobId], List]
+    properties: Dict[str, Any]
     creation_date: str
     submission_status: SubmissionStatus
     version: str
@@ -52,6 +54,7 @@ class _SubmissionModel(_BaseModel):
             entity_type=data["entity_type"],
             entity_config_id=data.get("entity_config_id"),
             job_ids=_BaseModel._deserialize_attribute(data["job_ids"]),
+            properties=_BaseModel._deserialize_attribute(data["properties"]),
             creation_date=data["creation_date"],
             submission_status=SubmissionStatus._from_repr(data["submission_status"]),
             version=data["version"],
@@ -64,6 +67,7 @@ class _SubmissionModel(_BaseModel):
             self.entity_type,
             self.entity_config_id,
             _BaseModel._serialize_attribute(self.job_ids),
+            _BaseModel._serialize_attribute(self.properties),
             self.creation_date,
             repr(self.submission_status),
             self.version,

+ 15 - 3
taipy/core/submission/submission.py

@@ -13,13 +13,13 @@ import threading
 import uuid
 from collections.abc import MutableSet
 from datetime import datetime
-from typing import Any, List, Optional, Union
+from typing import Any, Dict, List, Optional, Union
 
 from .._entity._entity import _Entity
 from .._entity._labeled import _Labeled
-from .._entity._reload import _self_reload, _self_setter
+from .._entity._properties import _Properties
+from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
-from ..job._job_manager_factory import _JobManagerFactory
 from ..job.job import Job, JobId, Status
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
 from .submission_id import SubmissionId
@@ -33,6 +33,7 @@ class Submission(_Entity, _Labeled):
         entity_id (str): The identifier of the entity that was submitted.
         id (str): The identifier of the `Submission^` entity.
         jobs (Optional[Union[List[Job], List[JobId]]]): A list of jobs.
+        properties (dict[str, Any]): A dictionary of additional properties.
         creation_date (Optional[datetime]): The date of this submission's creation.
         submission_status (Optional[SubmissionStatus]): The current status of this submission.
         version (Optional[str]): The string indicates the application version of the submission to instantiate.
@@ -51,6 +52,7 @@ class Submission(_Entity, _Labeled):
         entity_config_id: Optional[str] = None,
         id: Optional[str] = None,
         jobs: Optional[Union[List[Job], List[JobId]]] = None,
+        properties: Optional[Dict[str, Any]] = None,
         creation_date: Optional[datetime] = None,
         submission_status: Optional[SubmissionStatus] = None,
         version: Optional[str] = None,
@@ -64,6 +66,9 @@ class Submission(_Entity, _Labeled):
         self._submission_status = submission_status or SubmissionStatus.SUBMITTED
         self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
 
+        properties = properties or {}
+        self._properties = _Properties(self, **properties.copy())
+
         self.__abandoned = False
         self.__completed = False
 
@@ -89,6 +94,11 @@ class Submission(_Entity, _Labeled):
     def entity_config_id(self) -> Optional[str]:
         return self._entity_config_id
 
+    @property
+    def properties(self):
+        self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
+        return self._properties
+
     @property
     def creation_date(self):
         return self._creation_date
@@ -112,6 +122,8 @@ class Submission(_Entity, _Labeled):
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)
     def jobs(self) -> List[Job]:
+        from ..job._job_manager_factory import _JobManagerFactory
+
         jobs = []
         job_manager = _JobManagerFactory._build_manager()
 

+ 17 - 11
taipy/core/taipy.py

@@ -67,7 +67,7 @@ def set(entity: Union[DataNode, Task, Sequence, Scenario, Cycle]):
     This function allows you to save or update an entity in Taipy.
 
     Parameters:
-        entity (Union[DataNode^, Task^, Sequence^, Scenario^, Cycle^]): The
+        entity (Union[DataNode^, Task^, Sequence^, Scenario^, Cycle^, Submission^]): The
             entity to save or update.
     """
     if isinstance(entity, Cycle):
@@ -80,6 +80,8 @@ def set(entity: Union[DataNode, Task, Sequence, Scenario, Cycle]):
         return _TaskManagerFactory._build_manager()._set(entity)
     if isinstance(entity, DataNode):
         return _DataManagerFactory._build_manager()._set(entity)
+    if isinstance(entity, Submission):
+        return _SubmissionManagerFactory._build_manager()._set(entity)
 
 
 def is_submittable(entity: Union[Scenario, ScenarioId, Sequence, SequenceId, Task, TaskId, str]) -> bool:
@@ -176,7 +178,7 @@ def is_readable(
         SequenceId,
         ScenarioId,
         CycleId,
-        SubmissionId,
+        SubmissionId
     ],
 ) -> bool:
     """Indicate if an entity can be read.
@@ -223,7 +225,8 @@ def submit(
     force: bool = False,
     wait: bool = False,
     timeout: Optional[Union[float, int]] = None,
-) -> Optional[Union[Job, List[Job]]]:
+    **properties,
+) -> Submission:
     """Submit a scenario, sequence or task entity for execution.
 
     This function submits the given entity for execution and returns the created job(s).
@@ -238,19 +241,22 @@ def submit(
             in asynchronous mode.
         timeout (Union[float, int]): The optional maximum number of seconds to wait
             for the jobs to be finished before returning.
-
+        **properties (dict[str, any]): A keyworded variable length list of additional arguments.
     Returns:
-        The created `Job^` or a collection of the created `Job^` depends on the submitted entity.
-
-            - If a `Scenario^` or a `Sequence^` is provided, it will return a list of `Job^`.
-            - If a `Task^` is provided, it will return the created `Job^`.
+        The created `Submission^` containing the information about the submission.
     """
     if isinstance(entity, Scenario):
-        return _ScenarioManagerFactory._build_manager()._submit(entity, force=force, wait=wait, timeout=timeout)
+        return _ScenarioManagerFactory._build_manager()._submit(
+            entity, force=force, wait=wait, timeout=timeout, **properties
+        )
     if isinstance(entity, Sequence):
-        return _SequenceManagerFactory._build_manager()._submit(entity, force=force, wait=wait, timeout=timeout)
+        return _SequenceManagerFactory._build_manager()._submit(
+            entity, force=force, wait=wait, timeout=timeout, **properties
+        )
     if isinstance(entity, Task):
-        return _TaskManagerFactory._build_manager()._submit(entity, force=force, wait=wait, timeout=timeout)
+        return _TaskManagerFactory._build_manager()._submit(
+            entity, force=force, wait=wait, timeout=timeout, **properties
+        )
     return None
 
 

+ 7 - 3
taipy/core/task/_task_manager.py

@@ -28,6 +28,7 @@ from ..exceptions.exceptions import NonExistingTask
 from ..notification import EventEntityType, EventOperation, Notifier, _make_event
 from ..scenario.scenario_id import ScenarioId
 from ..sequence.sequence_id import SequenceId
+from ..submission.submission import Submission
 from ..task.task import Task
 from .task_id import TaskId
 
@@ -175,16 +176,19 @@ class _TaskManager(_Manager[Task], _VersionMixin):
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
         check_inputs_are_ready: bool = True,
-    ):
+        **properties,
+    ) -> Submission:
         task_id = task.id if isinstance(task, Task) else task
         task = cls._get(task_id)
         if task is None:
             raise NonExistingTask(task_id)
         if check_inputs_are_ready:
             _warn_if_inputs_not_ready(task.input.values())
-        job = cls._orchestrator().submit_task(task, callbacks=callbacks, force=force, wait=wait, timeout=timeout)
+        submission = cls._orchestrator().submit_task(
+            task, callbacks=callbacks, force=force, wait=wait, timeout=timeout, **properties
+        )
         Notifier.publish(_make_event(task, EventOperation.SUBMISSION))
-        return job
+        return submission
 
     @classmethod
     def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[Task]:

+ 7 - 8
taipy/core/task/task.py

@@ -10,7 +10,7 @@
 # specific language governing permissions and limitations under the License.
 
 import uuid
-from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Set, Union
+from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Union
 
 from taipy.config.common._template_handler import _TemplateHandler as _tpl
 from taipy.config.common._validate_id import _validate_id
@@ -23,11 +23,9 @@ from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
 from ..data.data_node import DataNode
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
+from ..submission.submission import Submission
 from .task_id import TaskId
 
-if TYPE_CHECKING:
-    from ..job.job import Job
-
 
 class Task(_Entity, _Labeled):
     """Hold a user function that will be executed, its parameters and the results.
@@ -180,7 +178,8 @@ class Task(_Entity, _Labeled):
         force: bool = False,
         wait: bool = False,
         timeout: Optional[Union[float, int]] = None,
-    ) -> "Job":  # noqa
+        **properties,
+    ) -> Submission:
         """Submit the task for execution.
 
         Parameters:
@@ -191,13 +190,13 @@ class Task(_Entity, _Labeled):
                 mode.
             timeout (Union[float, int]): The maximum number of seconds to wait for the job to be finished before
                 returning.
-
+            **properties (dict[str, any]): A keyworded variable length list of additional arguments.
         Returns:
-            The created `Job^`.
+            A `Submission^` containing the information of the submission.
         """
         from ._task_manager_factory import _TaskManagerFactory
 
-        return _TaskManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout)
+        return _TaskManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
 
     def get_label(self) -> str:
         """Returns the task simple label prefixed by its owner label.

+ 45 - 85
tests/core/_orchestrator/test_orchestrator.py

@@ -61,42 +61,34 @@ def test_submit_task_multithreading_multiple_task():
 
     with lock_1:
         with lock_2:
-            job_1 = _Orchestrator.submit_task(task_1)
-            job_2 = _Orchestrator.submit_task(task_2)
+            submission_1 = _Orchestrator.submit_task(task_1)
+            job_1 = submission_1._jobs[0]
+            submission_2 = _Orchestrator.submit_task(task_2)
+            job_2 = submission_2._jobs[0]
 
             assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
             assert task_2.output[f"{task_2.config_id}_output0"].read() == 0
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
+            assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
+            assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.RUNNING)
 
         assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-        )
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
-        )
+        assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
+        assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.COMPLETED)
 
     assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
-    )
+    assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.COMPLETED)
 
     assert job_2.is_completed()
-    assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
+    assert submission_2.submission_status == SubmissionStatus.COMPLETED
 
 
 @pytest.mark.orchestrator_dispatcher
@@ -116,7 +108,8 @@ def test_submit_submittable_multithreading_multiple_task():
 
     with lock_1:
         with lock_2:
-            tasks_jobs = {job._task.id: job for job in _Orchestrator.submit(scenario)}
+            submission = _Orchestrator.submit(scenario)
+            tasks_jobs = {job._task.id: job for job in submission._jobs}
             job_1 = tasks_jobs[task_1.id]
             job_2 = tasks_jobs[task_2.id]
 
@@ -125,25 +118,19 @@ def test_submit_submittable_multithreading_multiple_task():
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
+            assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
         assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-        )
+        assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
 
     assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
     assert_true_after_time(job_2.is_completed)
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
-    )
+    assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.COMPLETED)
 
 
 @pytest.mark.orchestrator_dispatcher
@@ -162,7 +149,8 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
     _OrchestratorFactory._build_dispatcher()
 
     with lock_0:
-        job_0 = _Orchestrator.submit_task(task_0)
+        submission_0 = _Orchestrator.submit_task(task_0)
+        job_0 = submission_0._jobs[0]
         assert_true_after_time(job_0.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
         assert_true_after_time(
@@ -172,20 +160,16 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
             with lock_2:
                 assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
                 assert task_2.output[f"{task_2.config_id}_output0"].read() == 0
-                job_2 = _Orchestrator.submit_task(task_2)
-                job_1 = _Orchestrator.submit_task(task_1)
+                submission_2 = _Orchestrator.submit_task(task_2)
+                job_2 = submission_2._jobs[0]
+                submission_1 = _Orchestrator.submit_task(task_1)
+                job_1 = submission_1._jobs[0]
                 assert_true_after_time(job_0.is_running)
                 assert_true_after_time(job_1.is_pending)
                 assert_true_after_time(job_2.is_running)
-                assert_true_after_time(
-                    lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
-                )
-                assert_true_after_time(
-                    lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.PENDING
-                )
-                assert_true_after_time(
-                    lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING
-                )
+                assert_true_after_time(lambda: submission_0.submission_status == SubmissionStatus.RUNNING)
+                assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.PENDING)
+                assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.RUNNING)
                 assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
 
             assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
@@ -193,30 +177,20 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
             assert_true_after_time(job_0.is_running)
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_completed)
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
-            )
+            assert_true_after_time(lambda: submission_0.submission_status == SubmissionStatus.RUNNING)
+            assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
+            assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.COMPLETED)
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
 
         assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
         assert task_0.output[f"{task_0.config_id}_output0"].read() == 0
         assert_true_after_time(job_0.is_running)
         assert_true_after_time(job_1.is_completed)
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
-        )
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
-        )
+        assert_true_after_time(lambda: submission_0.submission_status == SubmissionStatus.RUNNING)
+        assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.COMPLETED)
 
         assert job_2.is_completed()
-        assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
+        assert submission_2.submission_status == SubmissionStatus.COMPLETED
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
 
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
@@ -255,44 +229,36 @@ def test_blocked_task():
     assert not task_2.baz.is_ready_for_reading  # neither does baz
 
     assert len(_Orchestrator.blocked_jobs) == 0
-    job_2 = _Orchestrator.submit_task(task_2)  # job 2 is submitted first
+    submission_2 = _Orchestrator.submit_task(task_2)
+    job_2 = submission_2._jobs[0]  # job 2 is submitted first
     assert job_2.is_blocked()  # since bar is not is_valid the job 2 is blocked
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
     assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED
     assert len(_Orchestrator.blocked_jobs) == 1
     with lock_2:
         with lock_1:
-            job_1 = _Orchestrator.submit_task(task_1)  # job 1 is submitted and locked
+            submission_1 = _Orchestrator.submit_task(task_1)
+            job_1 = submission_1._jobs[0]  # job 1 is submitted and locked
             assert_true_after_time(job_1.is_running)  # so it is still running
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED
-            )
+            assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
+            assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.BLOCKED)
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
         assert len(_Orchestrator.blocked_jobs) == 0
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
-        )
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING
-        )
+        assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.COMPLETED)
+        assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.RUNNING)
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
-    )
+    assert submission_1.submission_status == SubmissionStatus.COMPLETED
+    assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.COMPLETED)
 
 
 @pytest.mark.orchestrator_dispatcher
@@ -324,32 +290,26 @@ def test_blocked_submittable():
     assert len(_Orchestrator.blocked_jobs) == 0
     with lock_2:
         with lock_1:
-            jobs = _Orchestrator.submit(scenario)  # scenario is submitted
-            tasks_jobs = {job._task.id: job for job in jobs}
+            submission = _Orchestrator.submit(scenario)  # scenario is submitted
+            tasks_jobs = {job._task.id: job for job in submission._jobs}
             job_1, job_2 = tasks_jobs[task_1.id], tasks_jobs[task_2.id]
             assert_true_after_time(job_1.is_running)  # job 1 is submitted and locked so it is still running
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
-            assert_true_after_time(
-                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-            )
+            assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
         assert len(_Orchestrator.blocked_jobs) == 0
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
-        )
+        assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
-    )
+    assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.COMPLETED)
 
 
 # ################################  UTIL METHODS    ##################################

+ 66 - 53
tests/core/_orchestrator/test_orchestrator__submit.py

@@ -24,7 +24,6 @@ from taipy.core.data import PickleDataNode
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.scenario._scenario_manager import _ScenarioManager
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
-from taipy.core.submission.submission import Submission
 from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.core.task._task_manager import _TaskManager
 
@@ -56,7 +55,10 @@ def test_submit_scenario_development_mode():
 
     submit_time = datetime.now() + timedelta(seconds=1)  # +1 to ensure the edit time of dn_0 is before the submit time
     with freezegun.freeze_time(submit_time):
-        jobs = orchestrator.submit(scenario)  # scenario is executed directly in development mode
+        submission = orchestrator.submit(
+            scenario, no_of_retry=10, log=True, log_file="file_path"
+        )  # scenario is executed directly in development mode
+        jobs = submission.jobs
 
     # data nodes should have been written (except the input dn_0)
     assert scenario.dn_0.last_edit_date < submit_time
@@ -74,8 +76,8 @@ def test_submit_scenario_development_mode():
     assert job_1.submit_entity_id == scenario.id
     assert job_1.creation_date == submit_time
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2 or t2_bis
     job_2 = jobs[1]
@@ -85,8 +87,8 @@ def test_submit_scenario_development_mode():
     assert job_2.submit_entity_id == scenario.id
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_2._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2_bis or t2
     job_2bis = jobs[2]
@@ -95,8 +97,8 @@ def test_submit_scenario_development_mode():
     assert not job_2bis.force
     assert job_2bis.submit_entity_id == scenario.id
     assert job_2bis.creation_date == submit_time
-    assert len(job_2bis._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_2bis._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_2bis._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_2bis._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis.stacktrace == []
     # t3
@@ -106,15 +108,14 @@ def test_submit_scenario_development_mode():
     assert job_3.is_completed()
     assert job_3.submit_entity_id == scenario.id
     assert job_3.creation_date == submit_time
-    assert len(job_3._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_3._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3.stacktrace == []
 
     assert job_1.submit_id == job_2.submit_id == job_2bis.submit_id == job_3.submit_id
 
     # submission is created and correct
-    submission = _SubmissionManagerFactory._build_manager()._get(job_1.submit_id)
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
     assert submission.submission_status == SubmissionStatus.COMPLETED
     assert submission.jobs == jobs
@@ -122,6 +123,7 @@ def test_submit_scenario_development_mode():
     assert submission.entity_id == scenario.id
     assert submission.entity_type == "SCENARIO"
     assert submission.entity_config_id == "scenario_cfg"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 0
@@ -134,7 +136,10 @@ def test_submit_scenario_development_mode_blocked_jobs():
 
     s_time = datetime.now() + timedelta(seconds=1)  # +1 to ensure the scenario creation is before the submit time
     with freezegun.freeze_time(s_time):
-        jobs = orchestrator.submit(scenario)  # first task is blocked because input is not ready
+        submission = orchestrator.submit(
+            scenario, no_of_retry=10, log=True, log_file="file_path"
+        )  # first task is blocked because input is not ready
+        jobs = submission.jobs
 
     # dn should be locked for edition
     assert scenario.dn_2.edit_in_progress
@@ -151,8 +156,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_1.submit_entity_id == scenario.id
     assert job_1.creation_date == s_time
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2 or t2_bis
     job_2 = jobs[1]
@@ -162,8 +167,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_2.submit_entity_id == scenario.id
     assert job_2.creation_date == s_time
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_2._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2_bis or t2
     job_2bis = jobs[2]
@@ -172,8 +177,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_2bis.submit_entity_id == scenario.id
     assert not job_2bis.force
     assert job_2bis.creation_date == s_time
-    assert len(job_2bis._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_2bis._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_2bis._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_2bis._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis.stacktrace == []
     # t3
@@ -184,8 +189,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_3.submit_entity_id == scenario.id
     assert job_3.creation_date == s_time
     assert job_3.stacktrace == []
-    assert len(job_3._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_3._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
 
     # Same submit_id
@@ -193,13 +198,13 @@ def test_submit_scenario_development_mode_blocked_jobs():
 
     # submission is created and correct
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
-    submission = _SubmissionManagerFactory._build_manager()._get(job_1.submit_id)
     assert submission.submission_status == SubmissionStatus.BLOCKED
     assert submission.jobs == jobs
     assert submission.creation_date == s_time
     assert submission.entity_id == scenario.id
     assert submission.entity_type == "SCENARIO"
     assert submission.entity_config_id == "scenario_cfg"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 4
@@ -214,7 +219,10 @@ def test_submit_scenario_standalone_mode():
     sc.dn_0.write(0)  # input data is made ready
     submit_time = datetime.now() + timedelta(seconds=1)  # +1 to ensure the edit time of dn_0 is before the submit time
     with freezegun.freeze_time(submit_time):
-        jobs = orchestrator.submit(sc)  # No dispatcher running. sc is not executed.
+        submission = orchestrator.submit(
+            sc, no_of_retry=10, log=True, log_file="file_path"
+        )  # No dispatcher running. sc is not executed.
+        jobs = submission.jobs
 
     # task output should be locked for edition
     assert sc.dn_1.edit_in_progress
@@ -230,8 +238,8 @@ def test_submit_scenario_standalone_mode():
     assert job_1.is_pending()
     assert job_1.creation_date == submit_time
     assert job_1.submit_entity_id == sc.id
-    assert len(job_1._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_1.stacktrace == []
     # t2 or t2_bis
@@ -242,18 +250,18 @@ def test_submit_scenario_standalone_mode():
     assert job_2.submit_entity_id == sc.id
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
-    assert job_2._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     # t2_bis or t2
     job_2bis = jobs[2]
     assert job_2bis.task == sc.t_2bis or job_2bis.task == sc.t_2
     assert job_2bis.is_blocked()
     assert not job_2bis.force
     assert job_2bis.submit_entity_id == sc.id
-    assert len(job_2bis._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job_2bis._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
-    assert job_2bis._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert job_2bis._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_2bis.creation_date == submit_time
     assert job_2bis.stacktrace == []
     # t3
@@ -262,16 +270,15 @@ def test_submit_scenario_standalone_mode():
     assert not job_3.force
     assert job_3.is_blocked()
     assert job_3.submit_entity_id == sc.id
-    assert len(job_3._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
-    assert job_3._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_3.creation_date == submit_time
     assert job_3.stacktrace == []
 
     assert job_1.submit_id == job_2.submit_id == job_2bis.submit_id == job_3.submit_id
 
     # submission is created and correct
-    submission = _SubmissionManagerFactory._build_manager()._get(job_1.submit_id)
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
     assert submission.submission_status == SubmissionStatus.PENDING
     assert submission.jobs == jobs
@@ -279,6 +286,7 @@ def test_submit_scenario_standalone_mode():
     assert submission.entity_id == sc.id
     assert submission.entity_type == "SCENARIO"
     assert submission.entity_config_id == "scenario_cfg"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 3
@@ -291,21 +299,22 @@ def test_submit_scenario_with_callbacks_and_force_and_wait():
     orchestrator = _OrchestratorFactory._build_orchestrator()
 
     with mock.patch("taipy.core._orchestrator._orchestrator._Orchestrator._wait_until_job_finished") as mck:
-        jobs = orchestrator.submit(scenario, callbacks=[nothing], force=True, wait=True, timeout=5)
+        submission = orchestrator.submit(scenario, callbacks=[nothing], force=True, wait=True, timeout=5)
+        jobs = submission.jobs
 
         # jobs are created in a specific order and are correct
         assert len(jobs) == 4
         assert len(jobs[0]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert jobs[0]._subscribers[0].__code__ == nothing.__code__
-        assert jobs[0]._subscribers[1].__code__ == Submission._update_submission_status.__code__
+        assert jobs[0]._subscribers[1].__code__ == _Orchestrator._update_submission_status.__code__
         assert jobs[0]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert len(jobs[1]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert jobs[1]._subscribers[0].__code__ == nothing.__code__
-        assert jobs[1]._subscribers[1].__code__ == Submission._update_submission_status.__code__
+        assert jobs[1]._subscribers[1].__code__ == _Orchestrator._update_submission_status.__code__
         assert jobs[1]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert len(jobs[2]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert jobs[2]._subscribers[0].__code__ == nothing.__code__
-        assert jobs[2]._subscribers[1].__code__ == Submission._update_submission_status.__code__
+        assert jobs[2]._subscribers[1].__code__ == _Orchestrator._update_submission_status.__code__
         assert jobs[2]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         mck.assert_called_once_with(jobs, timeout=5)
 
@@ -320,7 +329,10 @@ def test_submit_sequence_development_mode():
 
     submit_time = datetime.now() + timedelta(seconds=1)  # +1 to ensure the edit time of dn_0 is before the submit time
     with freezegun.freeze_time(submit_time):
-        jobs = orchestrator.submit(seq)  # sequence is executed directly in development mode
+        submission = orchestrator.submit(
+            seq, no_of_retry=10, log=True, log_file="file_path"
+        )  # sequence is executed directly in development mode
+        jobs = submission.jobs
 
     # data nodes should have been written (except the input dn_0)
     assert sce.dn_0.last_edit_date < submit_time
@@ -336,8 +348,8 @@ def test_submit_sequence_development_mode():
     assert job_1.submit_entity_id == seq.id
     assert job_1.creation_date == submit_time
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2
     job_2 = jobs[1]
@@ -347,8 +359,8 @@ def test_submit_sequence_development_mode():
     assert job_2.submit_entity_id == seq.id
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_2._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t3
     job_3 = jobs[2]
@@ -356,8 +368,8 @@ def test_submit_sequence_development_mode():
     assert not job_3.force
     assert job_3.is_completed()
     assert job_3.submit_entity_id == seq.id
-    assert len(job_3._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
-    assert job_3._subscribers[0].__code__ == Submission._update_submission_status.__code__
+    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3.creation_date == submit_time
     assert job_3.stacktrace == []
@@ -365,8 +377,6 @@ def test_submit_sequence_development_mode():
     assert job_1.submit_id == job_2.submit_id == job_3.submit_id
 
     # submission is created and correct
-    submit_id = job_2.submit_id
-    submission = _SubmissionManagerFactory._build_manager()._get(submit_id)
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
     assert submission.entity_type == "SEQUENCE"
     assert submission.submission_status == SubmissionStatus.COMPLETED
@@ -374,6 +384,7 @@ def test_submit_sequence_development_mode():
     assert submission.jobs == jobs
     assert submission.creation_date == submit_time
     assert submission.entity_id == seq.id
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 0
@@ -392,7 +403,10 @@ def test_submit_sequence_standalone_mode():
 
     submit_time = datetime.now() + timedelta(seconds=1)  # +1 to ensure the edit time of dn_0 is before the submit time
     with freezegun.freeze_time(submit_time):
-        jobs = orchestrator.submit(sequence)  # sequence is executed directly in development mode
+        submission = orchestrator.submit(
+            sequence, no_of_retry=10, log=True, log_file="file_path"
+        )  # sequence is executed directly in development mode
+        jobs = submission.jobs
 
     assert scenario.dn_1.edit_in_progress
     assert scenario.dn_2.edit_in_progress
@@ -408,7 +422,7 @@ def test_submit_sequence_standalone_mode():
     assert job_1.creation_date == submit_time
     assert job_1.submit_entity_id == sequence.id
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
     # t2
     job_2 = jobs[1]
     assert job_2.task == scenario.t_2
@@ -417,7 +431,7 @@ def test_submit_sequence_standalone_mode():
     assert job_2.submit_entity_id == sequence.id
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
     # t3
     job_3 = jobs[2]
     assert job_3.task == scenario.t_3
@@ -425,14 +439,12 @@ def test_submit_sequence_standalone_mode():
     assert job_3.is_blocked()
     assert job_3.creation_date == submit_time
     assert job_3.submit_entity_id == sequence.id
-    assert len(job_3._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
     assert job_3.stacktrace == []
 
     assert job_1.submit_id == job_2.submit_id == job_3.submit_id
 
     # submission is created and correct
-    submit_id = job_2.submit_id
-    submission = _SubmissionManagerFactory._build_manager()._get(submit_id)
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
     assert submission.submission_status == SubmissionStatus.PENDING
     assert submission.entity_type == "SEQUENCE"
@@ -440,6 +452,7 @@ def test_submit_sequence_standalone_mode():
     assert submission.jobs == jobs
     assert submission.creation_date == submit_time
     assert submission.entity_id == sequence.id
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 2
@@ -453,7 +466,7 @@ def test_submit_sequence_with_callbacks_and_force_and_wait():
     orchestrator = _OrchestratorFactory._build_orchestrator()
 
     with mock.patch("taipy.core._orchestrator._orchestrator._Orchestrator._wait_until_job_finished") as mck:
-        jobs = orchestrator.submit(scenario, callbacks=[nothing], force=True, wait=True, timeout=5)
+        jobs = orchestrator.submit(scenario, callbacks=[nothing], force=True, wait=True, timeout=5).jobs
         mck.assert_called_once_with(jobs, timeout=5)
 
     # jobs are created in a specific order and are correct
@@ -477,8 +490,8 @@ def test_submit_submittable_generate_unique_submit_id():
     scenario = Scenario("scenario", {task_1, task_2}, {})
     _ScenarioManager._set(scenario)
 
-    jobs_1 = taipy.submit(scenario)
-    jobs_2 = taipy.submit(scenario)
+    jobs_1 = taipy.submit(scenario).jobs
+    jobs_2 = taipy.submit(scenario).jobs
     assert len(jobs_1) == 2
     assert len(jobs_2) == 2
     assert jobs_1[0].submit_id == jobs_1[1].submit_id

+ 28 - 13
tests/core/_orchestrator/test_orchestrator__submit_task.py

@@ -47,7 +47,10 @@ def test_submit_task_development_mode():
 
     submit_time = datetime.now()
     with freezegun.freeze_time(submit_time):
-        job = orchestrator.submit_task(scenario.t1)  # t1 is executed directly in development mode
+        submission = orchestrator.submit_task(
+            scenario.t1, no_of_retry=10, log=True, log_file="file_path"
+        )  # t1 is executed directly in development mode
+        job = submission.jobs[0]
 
     # task output should have been written
     assert scenario.dn_1.last_edit_date == submit_time
@@ -62,14 +65,14 @@ def test_submit_task_development_mode():
     assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
 
     # submission is created and correct
-    all_submissions = _SubmissionManagerFactory._build_manager()._get_all()
-    assert len(all_submissions) == 1
-    assert all_submissions[0].creation_date == submit_time
-    assert all_submissions[0].submission_status == SubmissionStatus.COMPLETED
-    assert all_submissions[0].jobs == [job]
-    assert all_submissions[0].entity_id == scenario.t1.id
-    assert all_submissions[0].entity_type == "TASK"
-    assert all_submissions[0].entity_config_id == "t1"
+    assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
+    assert submission.creation_date == submit_time
+    assert submission.submission_status == SubmissionStatus.COMPLETED
+    assert submission.jobs == [job]
+    assert submission.entity_id == scenario.t1.id
+    assert submission.entity_type == "TASK"
+    assert submission.entity_config_id == "t1"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 0
@@ -82,7 +85,10 @@ def test_submit_task_development_mode_blocked_job():
 
     submit_time = datetime.now()
     with freezegun.freeze_time(submit_time):
-        job = orchestrator.submit_task(scenario.t2)  # t1 is executed directly in development mode
+        submission = orchestrator.submit_task(
+            scenario.t2, no_of_retry=10, log=True, log_file="file_path"
+        )  # t1 is executed directly in development mode
+        job = submission.jobs[0]
 
     # task output should have been written
     assert scenario.dn_2.edit_in_progress
@@ -104,6 +110,7 @@ def test_submit_task_development_mode_blocked_job():
     assert submission.entity_id == scenario.t2.id
     assert submission.entity_type == "TASK"
     assert submission.entity_config_id == "t2"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 1
@@ -118,7 +125,10 @@ def test_submit_task_standalone_mode():
 
     submit_time = datetime.now()
     with freezegun.freeze_time(submit_time):
-        job = orchestrator.submit_task(sc.t1)  # No dispatcher running. t1 is not executed in standalone mode.
+        submission = orchestrator.submit_task(
+            sc.t1, no_of_retry=10, log=True, log_file="file_path"
+        )  # No dispatcher running. t1 is not executed in standalone mode.
+        job = submission.jobs[0]
 
     # task output should NOT have been written
     assert sc.dn_1.last_edit_date is None
@@ -143,6 +153,7 @@ def test_submit_task_standalone_mode():
     assert submission.entity_id == sc.t1.id
     assert submission.entity_type == "TASK"
     assert submission.entity_config_id == "t1"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 0
@@ -157,7 +168,10 @@ def test_submit_task_standalone_mode_blocked_job():
 
     submit_time = datetime.now()
     with freezegun.freeze_time(submit_time):
-        job = orchestrator.submit_task(sc.t2)  # No dispatcher running. t2 is not executed in standalone mode.
+        submission = orchestrator.submit_task(
+            sc.t2, no_of_retry=10, log=True, log_file="file_path"
+        )  # No dispatcher running. t2 is not executed in standalone mode.
+        job = submission.jobs[0]
 
     # task output should NOT have been written
     assert sc.dn_2.last_edit_date is None
@@ -182,6 +196,7 @@ def test_submit_task_standalone_mode_blocked_job():
     assert submission.entity_id == sc.t2.id
     assert submission.entity_type == "TASK"
     assert submission.entity_config_id == "t2"
+    assert submission.properties == {"no_of_retry": 10, "log": True, "log_file": "file_path"}
 
     # orchestrator state is correct
     assert len(orchestrator.blocked_jobs) == 1
@@ -195,7 +210,7 @@ def test_submit_task_with_callbacks_and_force_and_wait():
     orchestrator = _OrchestratorFactory._build_orchestrator()
 
     with mock.patch("taipy.core._orchestrator._orchestrator._Orchestrator._wait_until_job_finished") as mck:
-        job = orchestrator.submit_task(scenario.t1, callbacks=[nothing], force=True, wait=True, timeout=2)
+        job = orchestrator.submit_task(scenario.t1, callbacks=[nothing], force=True, wait=True, timeout=2).jobs[0]
 
         # job exists and is correct
         assert job.task == scenario.t1

+ 20 - 20
tests/core/job/test_job_manager.py

@@ -80,11 +80,11 @@ def test_get_job():
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get(job_1.id) == job_1
     assert _JobManager._get(job_1.id).submit_entity_id == task.id
 
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert job_1 != job_2
     assert _JobManager._get(job_1.id).id == job_1.id
     assert _JobManager._get(job_2.id).id == job_2.id
@@ -99,17 +99,17 @@ def test_get_latest_job():
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get_latest(task) == job_1
     assert _JobManager._get_latest(task_2) is None
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task_2)
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task_2).jobs[0]
     assert _JobManager._get_latest(task).id == job_1.id
     assert _JobManager._get_latest(task_2).id == job_2.id
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    job_1_bis = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1_bis = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get_latest(task).id == job_1_bis.id
     assert _JobManager._get_latest(task_2).id == job_2.id
 
@@ -125,8 +125,8 @@ def test_get_jobs():
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     assert {job.id for job in _JobManager._get_all()} == {job_1.id, job_2.id}
 
@@ -138,8 +138,8 @@ def test_delete_job():
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     _JobManager._delete(job_1)
 
@@ -172,7 +172,7 @@ def test_raise_when_trying_to_delete_unfinished_job():
     )
     _OrchestratorFactory._build_dispatcher()
     with lock:
-        job = _OrchestratorFactory._orchestrator.submit_task(task)
+        job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
 
         assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
         assert_true_after_time(job.is_running)
@@ -200,7 +200,7 @@ def test_force_deleting_unfinished_job():
     )
     _OrchestratorFactory._build_dispatcher()
     with lock:
-        job = _OrchestratorFactory._orchestrator.submit_task(task)
+        job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
         assert_true_after_time(job.is_running)
         with pytest.raises(JobNotDeletedException):
             _JobManager._delete(job, force=False)
@@ -219,7 +219,7 @@ def test_cancel_single_job():
     _OrchestratorFactory._dispatcher.stop()
     assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     assert_true_after_time(job.is_pending)
     assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 0)
@@ -244,21 +244,21 @@ def test_cancel_canceled_abandoned_failed_jobs(cancel_jobs, orchestrated_job):
     _OrchestratorFactory._dispatcher.stop()
     assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.canceled()
     assert job.is_canceled()
     _JobManager._cancel(job)
     cancel_jobs.assert_not_called()
     assert job.is_canceled()
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.failed()
     assert job.is_failed()
     _JobManager._cancel(job)
     cancel_jobs.assert_not_called()
     assert job.is_failed()
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.abandoned()
     assert job.is_abandoned()
     _JobManager._cancel(job)
@@ -281,21 +281,21 @@ def test_cancel_completed_skipped_jobs(cancel_jobs, orchestrated_job):
     _OrchestratorFactory._dispatcher.stop()
     assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.completed()
     assert job.is_completed()
     cancel_jobs.assert_not_called()
     _JobManager._cancel(job)
     assert job.is_completed()
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.failed()
     assert job.is_failed()
     cancel_jobs.assert_not_called()
     _JobManager._cancel(job)
     assert job.is_failed()
 
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.skipped()
     assert job.is_skipped()
     cancel_jobs.assert_not_called()
@@ -323,7 +323,7 @@ def test_cancel_single_running_job():
     assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 2)
 
     with lock:
-        job = _OrchestratorFactory._orchestrator.submit_task(task)
+        job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
 
         assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
         assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 1)
@@ -433,7 +433,7 @@ def test_cancel_subsequent_jobs():
 def test_is_deletable():
     assert len(_JobManager._get_all()) == 0
     task = _create_task(print, 0, "task")
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     assert job.is_completed()
     assert _JobManager._is_deletable(job)

+ 12 - 12
tests/core/job/test_job_manager_with_sql_repo.py

@@ -86,11 +86,11 @@ def test_get_job(init_sql_repo):
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get(job_1.id) == job_1
     assert _JobManager._get(job_1.id).submit_entity_id == task.id
 
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert job_1 != job_2
     assert _JobManager._get(job_1.id).id == job_1.id
     assert _JobManager._get(job_2.id).id == job_2.id
@@ -106,17 +106,17 @@ def test_get_latest_job(init_sql_repo):
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get_latest(task) == job_1
     assert _JobManager._get_latest(task_2) is None
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task_2)
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task_2).jobs[0]
     assert _JobManager._get_latest(task).id == job_1.id
     assert _JobManager._get_latest(task_2).id == job_2.id
 
     sleep(0.01)  # Comparison is based on time, precision on Windows is not enough important
-    job_1_bis = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1_bis = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get_latest(task).id == job_1_bis.id
     assert _JobManager._get_latest(task_2).id == job_2.id
 
@@ -134,8 +134,8 @@ def test_get_jobs(init_sql_repo):
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     assert {job.id for job in _JobManager._get_all()} == {job_1.id, job_2.id}
 
@@ -149,8 +149,8 @@ def test_delete_job(init_sql_repo):
 
     _OrchestratorFactory._build_dispatcher()
 
-    job_1 = _OrchestratorFactory._orchestrator.submit_task(task)
-    job_2 = _OrchestratorFactory._orchestrator.submit_task(task)
+    job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
+    job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     _JobManager._delete(job_1)
 
@@ -175,7 +175,7 @@ def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo):
     _OrchestratorFactory._build_dispatcher()
 
     with lock:
-        job = _OrchestratorFactory._orchestrator.submit_task(task)
+        job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
 
         assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
         assert_true_after_time(job.is_running)
@@ -207,7 +207,7 @@ def test_force_deleting_unfinished_job(init_sql_repo):
     _OrchestratorFactory._build_dispatcher()
 
     with lock:
-        job = _OrchestratorFactory._orchestrator.submit_task(task_1)
+        job = _OrchestratorFactory._orchestrator.submit_task(task_1)._jobs[0]
         assert_true_after_time(job.is_running)
         with pytest.raises(JobNotDeletedException):
             _JobManager._delete(job, force=False)
@@ -221,7 +221,7 @@ def test_is_deletable(init_sql_repo):
 
     assert len(_JobManager._get_all()) == 0
     task = _create_task(print, 0, "task")
-    job = _OrchestratorFactory._orchestrator.submit_task(task)
+    job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     assert job.is_completed()
     assert _JobManager._is_deletable(job)

+ 2 - 1
tests/core/job/test_job_repositories.py

@@ -17,8 +17,9 @@ from taipy.core.data._data_sql_repository import _DataSQLRepository
 from taipy.core.exceptions import ModelNotFound
 from taipy.core.job._job_fs_repository import _JobFSRepository
 from taipy.core.job._job_sql_repository import _JobSQLRepository
-from taipy.core.job.job import Job, JobId, Task
+from taipy.core.job.job import Job, JobId
 from taipy.core.task._task_sql_repository import _TaskSQLRepository
+from taipy.core.task.task import Task
 
 
 class TestJobRepository:

+ 6 - 4
tests/core/notification/test_notifier.py

@@ -707,7 +707,8 @@ def test_publish_submission_event():
 
     # Test SUBMISSION Event
 
-    job = scenario.submit()[0]
+    submission = scenario.submit()
+    job = submission.jobs[0]
 
     assert registration_queue.qsize() == 6
     published_events = []
@@ -731,7 +732,7 @@ def test_publish_submission_event():
         EventEntityType.SUBMISSION,
         EventEntityType.SCENARIO,
     ]
-    expected_event_entity_id = [job.submit_id, job.id, job.submit_id, job.id, job.submit_id, scenario.id]
+    expected_event_entity_id = [submission.id, job.id, submission.id, job.id, submission.id, scenario.id]
     assert all(
         event.entity_type == expected_event_types[i]
         and event.entity_id == expected_event_entity_id[i]
@@ -755,7 +756,8 @@ def test_publish_deletion_event():
     task = scenario.tasks[task_config.id]
     dn = scenario.data_nodes[dn_config.id]
     sequence = scenario.sequences["sequence_config"]
-    job = scenario.submit()[0]
+    submission = scenario.submit()
+    job = submission.jobs[0]
 
     assert registration_queue.qsize() == 11
     while registration_queue.qsize() > 0:
@@ -780,7 +782,7 @@ def test_publish_deletion_event():
         EventEntityType.SUBMISSION,
     ]
 
-    expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id, job.submit_id]
+    expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id, submission.id]
     expected_event_operation_type = [EventOperation.DELETION] * len(expected_event_types)
 
     assert all(

+ 76 - 1
tests/core/submission/test_submission.py

@@ -42,6 +42,7 @@ def test_create_submission(scenario, job, current_datetime):
         scenario.config_id,
         "submission_id",
         [job],
+        {"debug": True, "log": "log_file", "retry_note": 5},
         current_datetime,
         SubmissionStatus.COMPLETED,
         "version_id",
@@ -52,6 +53,7 @@ def test_create_submission(scenario, job, current_datetime):
     assert submission_2.entity_type == scenario._ID_PREFIX
     assert submission_2.entity_config_id == scenario.config_id
     assert submission_2._jobs == [job]
+    assert submission_2._properties == {"debug": True, "log": "log_file", "retry_note": 5}
     assert submission_2.creation_date == current_datetime
     assert submission_2._submission_status == SubmissionStatus.COMPLETED
     assert submission_2._version == "version_id"
@@ -263,7 +265,7 @@ def test_update_submission_status_with_wrong_case_abandoned_without_cancel_or_fa
 
 def test_auto_set_and_reload():
     task = Task(config_id="name_1", properties={}, function=print, id=TaskId("task_1"))
-    submission_1 = Submission(task.id, task._ID_PREFIX, task.config_id)
+    submission_1 = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
     job_1 = Job("job_1", task, submission_1.id, submission_1.entity_id)
     job_2 = Job("job_2", task, submission_1.id, submission_1.entity_id)
 
@@ -305,20 +307,93 @@ def test_auto_set_and_reload():
     assert submission_1.submission_status == SubmissionStatus.COMPLETED
     assert submission_2.submission_status == SubmissionStatus.COMPLETED
 
+    # auto set & reload on properties attribute
+    assert submission_1.properties == {}
+    assert submission_2.properties == {}
+    submission_1._properties["qux"] = 4
+    assert submission_1.properties["qux"] == 4
+    assert submission_2.properties["qux"] == 4
+
+    assert submission_1.properties == {"qux": 4}
+    assert submission_2.properties == {"qux": 4}
+    submission_2._properties["qux"] = 5
+    assert submission_1.properties["qux"] == 5
+    assert submission_2.properties["qux"] == 5
+
+    submission_1.properties["temp_key_1"] = "temp_value_1"
+    submission_1.properties["temp_key_2"] = "temp_value_2"
+    assert submission_1.properties == {
+        "qux": 5,
+        "temp_key_1": "temp_value_1",
+        "temp_key_2": "temp_value_2",
+    }
+    assert submission_2.properties == {
+        "qux": 5,
+        "temp_key_1": "temp_value_1",
+        "temp_key_2": "temp_value_2",
+    }
+    submission_1.properties.pop("temp_key_1")
+    assert "temp_key_1" not in submission_1.properties.keys()
+    assert "temp_key_1" not in submission_1.properties.keys()
+    assert submission_1.properties == {
+        "qux": 5,
+        "temp_key_2": "temp_value_2",
+    }
+    assert submission_2.properties == {
+        "qux": 5,
+        "temp_key_2": "temp_value_2",
+    }
+    submission_2.properties.pop("temp_key_2")
+    assert submission_1.properties == {"qux": 5}
+    assert submission_2.properties == {"qux": 5}
+    assert "temp_key_2" not in submission_1.properties.keys()
+    assert "temp_key_2" not in submission_2.properties.keys()
+
+    submission_1.properties["temp_key_3"] = 0
+    assert submission_1.properties == {"qux": 5, "temp_key_3": 0}
+    assert submission_2.properties == {"qux": 5, "temp_key_3": 0}
+    submission_1.properties.update({"temp_key_3": 1})
+    assert submission_1.properties == {"qux": 5, "temp_key_3": 1}
+    assert submission_2.properties == {"qux": 5, "temp_key_3": 1}
+    submission_1.properties.update(dict())
+    assert submission_1.properties == {"qux": 5, "temp_key_3": 1}
+    assert submission_2.properties == {"qux": 5, "temp_key_3": 1}
+    submission_1.properties["temp_key_4"] = 0
+    submission_1.properties["temp_key_5"] = 0
+
     with submission_1 as submission:
         assert submission.jobs == [job_2, job_1]
         assert submission.submission_status == SubmissionStatus.COMPLETED
+        assert submission.properties["qux"] == 5
+        assert submission.properties["temp_key_3"] == 1
+        assert submission.properties["temp_key_4"] == 0
+        assert submission.properties["temp_key_5"] == 0
 
         submission.jobs = [job_1]
         submission.submission_status = SubmissionStatus.PENDING
+        submission.properties["qux"] = 9
+        submission.properties.pop("temp_key_3")
+        submission.properties.pop("temp_key_4")
+        submission.properties.update({"temp_key_4": 1})
+        submission.properties.update({"temp_key_5": 2})
+        submission.properties.pop("temp_key_5")
+        submission.properties.update(dict())
 
         assert submission.jobs == [job_2, job_1]
         assert submission.submission_status == SubmissionStatus.COMPLETED
+        assert submission.properties["qux"] == 5
+        assert submission.properties["temp_key_3"] == 1
+        assert submission.properties["temp_key_4"] == 0
+        assert submission.properties["temp_key_5"] == 0
 
     assert submission_1.jobs == [job_1]
     assert submission_1.submission_status == SubmissionStatus.PENDING
     assert submission_2.jobs == [job_1]
     assert submission_2.submission_status == SubmissionStatus.PENDING
+    assert submission_1.properties["qux"] == 9
+    assert "temp_key_3" not in submission_1.properties.keys()
+    assert submission_1.properties["temp_key_4"] == 1
+    assert "temp_key_5" not in submission_1.properties.keys()
 
 
 @pytest.mark.parametrize(

+ 8 - 2
tests/core/submission/test_submission_manager.py

@@ -27,12 +27,14 @@ from taipy.core.task.task import Task
 
 def test_create_submission(scenario):
     submission_1 = _SubmissionManagerFactory._build_manager()._create(
-        scenario.id, scenario._ID_PREFIX, scenario.config_id
+        scenario.id, scenario._ID_PREFIX, scenario.config_id, debug=True, log="log_file", retry_note=5
     )
 
+    assert isinstance(submission_1, Submission)
     assert submission_1.id is not None
     assert submission_1.entity_id == scenario.id
     assert submission_1.jobs == []
+    assert submission_1.properties == {"debug": True, "log": "log_file", "retry_note": 5}
     assert isinstance(submission_1.creation_date, datetime)
     assert submission_1._submission_status == SubmissionStatus.SUBMITTED
 
@@ -42,7 +44,9 @@ def test_get_submission():
 
     assert submission_manager._get("random_submission_id") is None
 
-    submission_1 = submission_manager._create("entity_id", "ENTITY_TYPE", "entity_config_id")
+    submission_1 = submission_manager._create(
+        "entity_id", "ENTITY_TYPE", "entity_config_id", debug=True, log="log_file", retry_note=5
+    )
     submission_2 = submission_manager._get(submission_1.id)
 
     assert submission_1.id == submission_2.id
@@ -50,6 +54,8 @@ def test_get_submission():
     assert submission_1.jobs == submission_2.jobs
     assert submission_1.creation_date == submission_2.creation_date
     assert submission_1.submission_status == submission_2.submission_status
+    assert submission_1.properties == {"debug": True, "log": "log_file", "retry_note": 5}
+    assert submission_1.properties == submission_2.properties
 
 
 def test_get_all_submission():

+ 8 - 2
tests/core/submission/test_submission_manager_with_sql_repo.py

@@ -31,12 +31,14 @@ def test_create_submission(scenario, init_sql_repo):
     init_managers()
 
     submission_1 = _SubmissionManagerFactory._build_manager()._create(
-        scenario.id, scenario._ID_PREFIX, scenario.config_id
+        scenario.id, scenario._ID_PREFIX, scenario.config_id, debug=True, log="log_file", retry_note=5
     )
 
+    assert isinstance(submission_1, Submission)
     assert submission_1.id is not None
     assert submission_1.entity_id == scenario.id
     assert submission_1.jobs == []
+    assert submission_1.properties == {"debug": True, "log": "log_file", "retry_note": 5}
     assert isinstance(submission_1.creation_date, datetime)
     assert submission_1._submission_status == SubmissionStatus.SUBMITTED
 
@@ -46,7 +48,9 @@ def test_get_submission(init_sql_repo):
 
     submission_manager = _SubmissionManagerFactory._build_manager()
 
-    submission_1 = submission_manager._create("entity_id", "ENTITY_TYPE", "entity_config_id")
+    submission_1 = submission_manager._create(
+        "entity_id", "ENTITY_TYPE", "entity_config_id", debug=True, log="log_file", retry_note=5
+    )
     submission_2 = submission_manager._get(submission_1.id)
 
     assert submission_1.id == submission_2.id
@@ -54,6 +58,8 @@ def test_get_submission(init_sql_repo):
     assert submission_1.jobs == submission_2.jobs
     assert submission_1.creation_date == submission_2.creation_date
     assert submission_1.submission_status == submission_2.submission_status
+    assert submission_1.properties == {"debug": True, "log": "log_file", "retry_note": 5}
+    assert submission_1.properties == submission_2.properties
 
 
 def test_get_all_submission(init_sql_repo):

+ 7 - 1
tests/core/submission/test_submission_repositories.py

@@ -43,13 +43,19 @@ class TestSubmissionRepository:
         job._task = task
         _JobManagerFactory._build_manager()._repository._save(job)
 
-        submission = Submission(task.id, task._ID_PREFIX, task.config_id)
+        submission = Submission(
+            task.id, task._ID_PREFIX, task.config_id, properties={"debug": True, "log": "log_file", "retry_note": 5}
+        )
         submission_repository = _SubmissionManagerFactory._build_manager()._repository
         submission_repository._save(submission)
         submission.jobs = [job]
 
         obj = submission_repository._load(submission.id)
         assert isinstance(obj, Submission)
+        assert obj.entity_id == task.id
+        assert obj.entity_type == task._ID_PREFIX
+        assert obj.entity_config_id == task.config_id
+        assert obj.properties == {"debug": True, "log": "log_file", "retry_note": 5}
 
     @pytest.mark.parametrize("configure_repo", [configure_fs_repo, configure_sql_repo])
     def test_exists(self, configure_repo):

+ 2 - 2
tests/core/test_core_cli.py

@@ -466,7 +466,7 @@ def test_modify_job_configuration_dont_stop_application(caplog, init_config):
         Config.configure_job_executions(mode="development")
         core.run(force_restart=True)
         scenario = _ScenarioManager._create(scenario_config)
-        jobs = taipy.submit(scenario)
+        jobs = taipy.submit(scenario).jobs
         assert all(job.is_finished() for job in jobs)
         core.stop()
     init_config()
@@ -476,7 +476,7 @@ def test_modify_job_configuration_dont_stop_application(caplog, init_config):
         Config.configure_job_executions(mode="standalone", max_nb_of_workers=5)
         core.run(force_restart=True)
         scenario = _ScenarioManager._create(scenario_config)
-        jobs = taipy.submit(scenario)
+        jobs = taipy.submit(scenario).jobs
         assert_true_after_time(lambda: all(job.is_finished() for job in jobs))
         error_message = str(caplog.text)
         assert 'JOB "mode" was modified' in error_message

+ 2 - 2
tests/core/test_core_cli_with_sql_repo.py

@@ -513,7 +513,7 @@ def test_modify_job_configuration_dont_stop_application(caplog, init_sql_repo, i
         core = Core()
         core.run(force_restart=True)
         scenario = _ScenarioManager._create(scenario_config)
-        jobs = _ScenarioManager._submit(scenario)
+        jobs = _ScenarioManager._submit(scenario).jobs
         assert all(job.is_finished() for job in jobs)
         core.stop()
 
@@ -528,7 +528,7 @@ def test_modify_job_configuration_dont_stop_application(caplog, init_sql_repo, i
         core.run(force_restart=True)
         scenario = _ScenarioManager._create(scenario_config)
 
-        jobs = _ScenarioManager._submit(scenario)
+        jobs = _ScenarioManager._submit(scenario).jobs
         assert_true_after_time(lambda: all(job.is_finished() for job in jobs))
         error_message = str(caplog.text)
         assert 'JOB "mode" was modified' in error_message

+ 3 - 3
tests/core/test_taipy.py

@@ -164,7 +164,7 @@ class TestTaipy:
         assert tp.is_editable(task)
         assert tp.is_editable(cycle)
         assert not tp.is_editable(job)
-        assert not tp.is_editable(submission)
+        assert tp.is_editable(submission)
         assert tp.is_editable(dn)
 
     def test_is_readable_is_called(self, cycle, job, data_node):
@@ -665,7 +665,7 @@ class TestTaipy:
         scenario_cfg_2 = Config.configure_scenario("s2", [task_cfg_2], [], Frequency.DAILY)
 
         scenario_1 = tp.create_scenario(scenario_cfg_1)
-        job_1 = tp.submit(scenario_1)[0]
+        job_1 = tp.submit(scenario_1).jobs[0]
 
         # Export scenario 1
         tp.export_scenario(scenario_1.id, "./tmp/exp_scenario_1")
@@ -678,7 +678,7 @@ class TestTaipy:
         assert sorted(os.listdir("./tmp/exp_scenario_1/cycles")) == sorted([f"{scenario_1.cycle.id}.json"])
 
         scenario_2 = tp.create_scenario(scenario_cfg_2)
-        job_2 = tp.submit(scenario_2)[0]
+        job_2 = tp.submit(scenario_2).jobs[0]
 
         # Export scenario 2
         scenario_2.export(pathlib.Path.cwd() / "./tmp/exp_scenario_2")

+ 4 - 4
tests/core/version/test_production_version_migration.py

@@ -71,7 +71,7 @@ def test_migrate_datanode_in_standalone_mode(init_config):
         core = Core()
         core.run()
         scenario_v2 = _ScenarioManager._create(scenario_cfg_v2)
-        jobs = _ScenarioManager._submit(scenario_v2)
+        jobs = _ScenarioManager._submit(scenario_v2).jobs
         v1 = taipy.get(scenario_v1.id)
         assert v1.d1.version == "2.0"
         assert v1.d1.path == "bar.pkl"
@@ -104,7 +104,7 @@ def test_migrate_task_in_standalone_mode(init_config):
         core = Core()
         core.run()
         scenario_v2 = _ScenarioManager._create(scenario_cfg_v2)
-        jobs = _ScenarioManager._submit(scenario_v2)
+        jobs = _ScenarioManager._submit(scenario_v2).jobs
         v1 = taipy.get(scenario_v1.id)
         assert v1.my_task.version == "2.0"
         assert v1.my_task.skippable is True
@@ -137,7 +137,7 @@ def test_migrate_scenario_in_standalone_mode(init_config):
         core = Core()
         core.run()
         scenario_v2 = _ScenarioManager._create(scenario_cfg_v2)
-        jobs = _ScenarioManager._submit(scenario_v2)
+        jobs = _ScenarioManager._submit(scenario_v2).jobs
         v1 = taipy.get(scenario_v1.id)
         assert v1.version == "2.0"
         assert v1.properties["foo"] == "bar"
@@ -177,7 +177,7 @@ def test_migrate_all_entities_in_standalone_mode(init_config):
         core = Core()
         core.run()
         scenario_v2 = _ScenarioManager._create(scenario_cfg_v2)
-        jobs = _ScenarioManager._submit(scenario_v2)
+        jobs = _ScenarioManager._submit(scenario_v2).jobs
         v1 = taipy.get(scenario_v1.id)
         assert v1.version == "2.0"
         assert v1.properties["foo"] == "bar"