浏览代码

remove in-memory state in submision and store it in repo

Toan Quach 1 年之前
父节点
当前提交
7d0cc09fbb

+ 3 - 10
taipy/core/_orchestrator/_orchestrator.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from multiprocessing import Lock
 from multiprocessing import Lock
 from queue import Queue
 from queue import Queue
 from time import sleep
 from time import sleep
-from typing import Callable, Dict, Iterable, List, Optional, Set, Union
+from typing import Callable, Iterable, List, Optional, Set, Union
 
 
 from taipy.config.config import Config
 from taipy.config.config import Config
 from taipy.logger._taipy_logger import _TaipyLogger
 from taipy.logger._taipy_logger import _TaipyLogger
@@ -39,7 +39,6 @@ class _Orchestrator(_AbstractOrchestrator):
     blocked_jobs: List = []
     blocked_jobs: List = []
     lock = Lock()
     lock = Lock()
     __logger = _TaipyLogger._get_logger()
     __logger = _TaipyLogger._get_logger()
-    _submission_entities: Dict[str, Submission] = {}
 
 
     @classmethod
     @classmethod
     def initialize(cls):
     def initialize(cls):
@@ -76,7 +75,6 @@ class _Orchestrator(_AbstractOrchestrator):
             getattr(submittable, "config_id", None),
             getattr(submittable, "config_id", None),
             **properties,
             **properties,
         )
         )
-        cls._submission_entities[submission.id] = submission
         jobs = []
         jobs = []
         tasks = submittable._get_sorted_tasks()
         tasks = submittable._get_sorted_tasks()
         with cls.lock:
         with cls.lock:
@@ -87,7 +85,7 @@ class _Orchestrator(_AbstractOrchestrator):
                             task,
                             task,
                             submission.id,
                             submission.id,
                             submission.entity_id,
                             submission.entity_id,
-                            callbacks=itertools.chain([cls._update_submission_status], callbacks or []),
+                            callbacks=itertools.chain([Job._update_submission_status], callbacks or []),
                             force=force,  # type: ignore
                             force=force,  # type: ignore
                         )
                         )
                     )
                     )
@@ -128,13 +126,12 @@ class _Orchestrator(_AbstractOrchestrator):
             task.id, task._ID_PREFIX, task.config_id, **properties
             task.id, task._ID_PREFIX, task.config_id, **properties
         )
         )
         submit_id = submission.id
         submit_id = submission.id
-        cls._submission_entities[submission.id] = submission
         with cls.lock:
         with cls.lock:
             job = cls._lock_dn_output_and_create_job(
             job = cls._lock_dn_output_and_create_job(
                 task,
                 task,
                 submit_id,
                 submit_id,
                 submission.entity_id,
                 submission.entity_id,
-                itertools.chain([cls._update_submission_status], callbacks or []),
+                itertools.chain([Job._update_submission_status], callbacks or []),
                 force,
                 force,
             )
             )
         jobs = [job]
         jobs = [job]
@@ -147,10 +144,6 @@ class _Orchestrator(_AbstractOrchestrator):
                 cls._wait_until_job_finished(job, timeout=timeout)
                 cls._wait_until_job_finished(job, timeout=timeout)
         return submission
         return submission
 
 
-    @classmethod
-    def _update_submission_status(cls, job: Job):
-        cls._submission_entities[job.submit_id]._update_submission_status(job)
-
     @classmethod
     @classmethod
     def _lock_dn_output_and_create_job(
     def _lock_dn_output_and_create_job(
         cls,
         cls,

+ 58 - 0
taipy/core/common/_self_setter_set.py

@@ -0,0 +1,58 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from collections.abc import MutableSet
+from typing import Any, Iterable, Iterator
+
+
+class _SelfSetterSet(MutableSet):
+    def __init__(self, parent, data: Iterable):
+        self._parent = parent
+        self.data = set(data)
+
+    def __set_self(self):
+        from ... import core as tp
+
+        if hasattr(self, "_parent"):
+            tp.set(self._parent)
+
+    def __contains__(self, value: Any) -> bool:
+        return value in self.data
+
+    def __repr__(self) -> str:
+        return repr(self.data)
+
+    def __iter__(self) -> Iterator:
+        return iter(self.data)
+
+    def __len__(self) -> int:
+        return len(self.data)
+
+    def add(self, value: Any):
+        self.data.add(value)
+        self.__set_self()
+
+    def remove(self, value: Any):
+        self.data.remove(value)
+        self.__set_self()
+
+    def discard(self, value: Any):
+        self.data.discard(value)
+        self.__set_self()
+
+    def pop(self):
+        item = self.data.pop()
+        self.__set_self()
+        return item
+
+    def clear(self) -> None:
+        self.data.clear()
+        self.__set_self()

+ 6 - 0
taipy/core/job/job.py

@@ -327,6 +327,12 @@ class Job(_Entity, _Labeled):
             self.completed()
             self.completed()
             self.__logger.info(f"job {self.id} is completed.")
             self.__logger.info(f"job {self.id} is completed.")
 
 
+    @classmethod
+    def _update_submission_status(cls, job: "Job"):
+        from ..submission._submission_manager_factory import _SubmissionManagerFactory
+
+        _SubmissionManagerFactory._build_manager()._get(job.submit_id)._update_submission_status(job)
+
     def __hash__(self):
     def __hash__(self):
         return hash(self.id)
         return hash(self.id)
 
 

+ 16 - 0
taipy/core/submission/_submission_converter.py

@@ -12,6 +12,7 @@
 from datetime import datetime
 from datetime import datetime
 
 
 from .._repository._abstract_converter import _AbstractConverter
 from .._repository._abstract_converter import _AbstractConverter
+from ..common._self_setter_set import _SelfSetterSet
 from ..job.job import Job, JobId
 from ..job.job import Job, JobId
 from ..submission._submission_model import _SubmissionModel
 from ..submission._submission_model import _SubmissionModel
 from ..submission.submission import Submission
 from ..submission.submission import Submission
@@ -31,6 +32,12 @@ class _SubmissionConverter(_AbstractConverter):
             creation_date=submission._creation_date.isoformat(),
             creation_date=submission._creation_date.isoformat(),
             submission_status=submission._submission_status,
             submission_status=submission._submission_status,
             version=submission._version,
             version=submission._version,
+            is_abandoned=submission._is_abandoned,
+            is_completed=submission._is_completed,
+            is_canceled=submission._is_canceled,
+            running_jobs=list(submission._running_jobs.data),
+            blocked_jobs=list(submission._blocked_jobs.data),
+            pending_jobs=list(submission._pending_jobs.data),
         )
         )
 
 
     @classmethod
     @classmethod
@@ -46,4 +53,13 @@ class _SubmissionConverter(_AbstractConverter):
             submission_status=model.submission_status,
             submission_status=model.submission_status,
             version=model.version,
             version=model.version,
         )
         )
+
+        submission._is_abandoned = model.is_abandoned
+        submission._is_completed = model.is_completed
+        submission._is_canceled = model.is_canceled
+
+        submission._running_jobs = _SelfSetterSet(submission, model.running_jobs)
+        submission._blocked_jobs = _SelfSetterSet(submission, model.blocked_jobs)
+        submission._pending_jobs = _SelfSetterSet(submission, model.pending_jobs)
+
         return submission
         return submission

+ 25 - 1
taipy/core/submission/_submission_model.py

@@ -12,7 +12,7 @@
 from dataclasses import dataclass
 from dataclasses import dataclass
 from typing import Any, Dict, List, Optional, Union
 from typing import Any, Dict, List, Optional, Union
 
 
-from sqlalchemy import JSON, Column, Enum, String, Table
+from sqlalchemy import JSON, Boolean, Column, Enum, String, Table
 
 
 from .._repository._base_taipy_model import _BaseModel
 from .._repository._base_taipy_model import _BaseModel
 from .._repository.db._sql_base_model import mapper_registry
 from .._repository.db._sql_base_model import mapper_registry
@@ -35,6 +35,12 @@ class _SubmissionModel(_BaseModel):
         Column("creation_date", String),
         Column("creation_date", String),
         Column("submission_status", Enum(SubmissionStatus)),
         Column("submission_status", Enum(SubmissionStatus)),
         Column("version", String),
         Column("version", String),
+        Column("is_completed", Boolean),
+        Column("is_abandoned", Boolean),
+        Column("is_canceled", Boolean),
+        Column("running_jobs", JSON),
+        Column("blocked_jobs", JSON),
+        Column("pending_jobs", JSON),
     )
     )
     id: str
     id: str
     entity_id: str
     entity_id: str
@@ -45,6 +51,12 @@ class _SubmissionModel(_BaseModel):
     creation_date: str
     creation_date: str
     submission_status: SubmissionStatus
     submission_status: SubmissionStatus
     version: str
     version: str
+    is_completed: bool
+    is_abandoned: bool
+    is_canceled: bool
+    running_jobs: List[str]
+    blocked_jobs: List[str]
+    pending_jobs: List[str]
 
 
     @staticmethod
     @staticmethod
     def from_dict(data: Dict[str, Any]):
     def from_dict(data: Dict[str, Any]):
@@ -58,6 +70,12 @@ class _SubmissionModel(_BaseModel):
             creation_date=data["creation_date"],
             creation_date=data["creation_date"],
             submission_status=SubmissionStatus._from_repr(data["submission_status"]),
             submission_status=SubmissionStatus._from_repr(data["submission_status"]),
             version=data["version"],
             version=data["version"],
+            is_completed=data["is_completed"],
+            is_abandoned=data["is_abandoned"],
+            is_canceled=data["is_canceled"],
+            running_jobs=_BaseModel._deserialize_attribute(data["running_jobs"]),
+            blocked_jobs=_BaseModel._deserialize_attribute(data["blocked_jobs"]),
+            pending_jobs=_BaseModel._deserialize_attribute(data["pending_jobs"]),
         )
         )
 
 
     def to_list(self):
     def to_list(self):
@@ -71,4 +89,10 @@ class _SubmissionModel(_BaseModel):
             self.creation_date,
             self.creation_date,
             repr(self.submission_status),
             repr(self.submission_status),
             self.version,
             self.version,
+            self.is_completed,
+            self.is_abandoned,
+            self.is_canceled,
+            _BaseModel._serialize_attribute(self.running_jobs),
+            _BaseModel._serialize_attribute(self.blocked_jobs),
+            _BaseModel._serialize_attribute(self.pending_jobs),
         ]
         ]

+ 73 - 28
taipy/core/submission/submission.py

@@ -11,7 +11,6 @@
 
 
 import threading
 import threading
 import uuid
 import uuid
-from collections.abc import MutableSet
 from datetime import datetime
 from datetime import datetime
 from typing import Any, Dict, List, Optional, Union
 from typing import Any, Dict, List, Optional, Union
 
 
@@ -20,6 +19,7 @@ from .._entity._labeled import _Labeled
 from .._entity._properties import _Properties
 from .._entity._properties import _Properties
 from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
 from .._version._version_manager_factory import _VersionManagerFactory
+from ..common._self_setter_set import _SelfSetterSet
 from ..job.job import Job, JobId, Status
 from ..job.job import Job, JobId, Status
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
 from .submission_id import SubmissionId
 from .submission_id import SubmissionId
@@ -69,13 +69,13 @@ class Submission(_Entity, _Labeled):
         properties = properties or {}
         properties = properties or {}
         self._properties = _Properties(self, **properties.copy())
         self._properties = _Properties(self, **properties.copy())
 
 
-        self.__abandoned = False
-        self.__completed = False
+        self._is_abandoned = False
+        self._is_completed = False
+        self._is_canceled = False
 
 
-        self.__is_canceled = False
-        self.__running_jobs: MutableSet[str] = set()
-        self.__blocked_jobs: MutableSet[str] = set()
-        self.__pending_jobs: MutableSet[str] = set()
+        self._running_jobs: _SelfSetterSet = _SelfSetterSet(self, list())
+        self._blocked_jobs: _SelfSetterSet = _SelfSetterSet(self, list())
+        self._pending_jobs: _SelfSetterSet = _SelfSetterSet(self, list())
 
 
     @staticmethod
     @staticmethod
     def __new_id() -> str:
     def __new_id() -> str:
@@ -132,6 +132,21 @@ class Submission(_Entity, _Labeled):
 
 
         return jobs
         return jobs
 
 
+    @property
+    def running_jobs(self) -> _SelfSetterSet:
+        self._running_jobs = _Reloader()._reload(self._MANAGER_NAME, self)._running_jobs
+        return self._running_jobs
+
+    @property
+    def blocked_jobs(self) -> _SelfSetterSet:
+        self._blocked_jobs = _Reloader()._reload(self._MANAGER_NAME, self)._blocked_jobs
+        return self._blocked_jobs
+
+    @property
+    def pending_jobs(self) -> _SelfSetterSet:
+        self._pending_jobs = _Reloader()._reload(self._MANAGER_NAME, self)._pending_jobs
+        return self._pending_jobs
+
     @jobs.setter  # type: ignore
     @jobs.setter  # type: ignore
     @_self_setter(_MANAGER_NAME)
     @_self_setter(_MANAGER_NAME)
     def jobs(self, jobs: Union[List[Job], List[JobId]]):
     def jobs(self, jobs: Union[List[Job], List[JobId]]):
@@ -153,6 +168,36 @@ class Submission(_Entity, _Labeled):
     def submission_status(self, submission_status):
     def submission_status(self, submission_status):
         self._submission_status = submission_status
         self._submission_status = submission_status
 
 
+    @property  # type: ignore
+    @_self_reload(_MANAGER_NAME)
+    def is_abandoned(self):
+        return self._is_abandoned
+
+    @is_abandoned.setter  # type: ignore
+    @_self_setter(_MANAGER_NAME)
+    def is_abandoned(self, val):
+        self._is_abandoned = val
+
+    @property  # type: ignore
+    @_self_reload(_MANAGER_NAME)
+    def is_completed(self):
+        return self._is_completed
+
+    @is_completed.setter  # type: ignore
+    @_self_setter(_MANAGER_NAME)
+    def is_completed(self, val):
+        self._is_completed = val
+
+    @property  # type: ignore
+    @_self_reload(_MANAGER_NAME)
+    def is_canceled(self):
+        return self._is_canceled
+
+    @is_canceled.setter  # type: ignore
+    @_self_setter(_MANAGER_NAME)
+    def is_canceled(self, val):
+        self._is_canceled = val
+
     def __lt__(self, other):
     def __lt__(self, other):
         return self.creation_date.timestamp() < other.creation_date.timestamp()
         return self.creation_date.timestamp() < other.creation_date.timestamp()
 
 
@@ -177,38 +222,38 @@ class Submission(_Entity, _Labeled):
 
 
         with self.lock:
         with self.lock:
             if job_status == Status.CANCELED:
             if job_status == Status.CANCELED:
-                self.__is_canceled = True
+                self.is_canceled = True
             elif job_status == Status.BLOCKED:
             elif job_status == Status.BLOCKED:
-                self.__blocked_jobs.add(job.id)
-                self.__pending_jobs.discard(job.id)
+                self.blocked_jobs.add(job.id)
+                self.pending_jobs.discard(job.id)
             elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
             elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
-                self.__pending_jobs.add(job.id)
-                self.__blocked_jobs.discard(job.id)
+                self.pending_jobs.add(job.id)
+                self.blocked_jobs.discard(job.id)
             elif job_status == Status.RUNNING:
             elif job_status == Status.RUNNING:
-                self.__running_jobs.add(job.id)
-                self.__pending_jobs.discard(job.id)
+                self.running_jobs.add(job.id)
+                self.pending_jobs.discard(job.id)
             elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
             elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
-                self.__completed = True
-                self.__blocked_jobs.discard(job.id)
-                self.__pending_jobs.discard(job.id)
-                self.__running_jobs.discard(job.id)
+                self.is_completed = True
+                self.blocked_jobs.discard(job.id)
+                self.pending_jobs.discard(job.id)
+                self.running_jobs.discard(job.id)
             elif job_status == Status.ABANDONED:
             elif job_status == Status.ABANDONED:
-                self.__abandoned = True
-                self.__running_jobs.discard(job.id)
-                self.__blocked_jobs.discard(job.id)
-                self.__pending_jobs.discard(job.id)
+                self.is_abandoned = True
+                self.running_jobs.discard(job.id)
+                self.blocked_jobs.discard(job.id)
+                self.pending_jobs.discard(job.id)
 
 
-        if self.__is_canceled:
+        if self.is_canceled:
             self.submission_status = SubmissionStatus.CANCELED  # type: ignore
             self.submission_status = SubmissionStatus.CANCELED  # type: ignore
-        elif self.__abandoned:
+        elif self.is_abandoned:
             self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
             self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
-        elif self.__running_jobs:
+        elif self.running_jobs:
             self.submission_status = SubmissionStatus.RUNNING  # type: ignore
             self.submission_status = SubmissionStatus.RUNNING  # type: ignore
-        elif self.__pending_jobs:
+        elif self.pending_jobs:
             self.submission_status = SubmissionStatus.PENDING  # type: ignore
             self.submission_status = SubmissionStatus.PENDING  # type: ignore
-        elif self.__blocked_jobs:
+        elif self.blocked_jobs:
             self.submission_status = SubmissionStatus.BLOCKED  # type: ignore
             self.submission_status = SubmissionStatus.BLOCKED  # type: ignore
-        elif self.__completed:
+        elif self.is_completed:
             self.submission_status = SubmissionStatus.COMPLETED  # type: ignore
             self.submission_status = SubmissionStatus.COMPLETED  # type: ignore
         else:
         else:
             self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
             self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore

+ 37 - 36
tests/core/_orchestrator/test_orchestrator__submit.py

@@ -22,6 +22,7 @@ from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.config import JobConfig
 from taipy.core.config import JobConfig
 from taipy.core.data import PickleDataNode
 from taipy.core.data import PickleDataNode
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data._data_manager import _DataManager
+from taipy.core.job.job import Job
 from taipy.core.scenario._scenario_manager import _ScenarioManager
 from taipy.core.scenario._scenario_manager import _ScenarioManager
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.core.submission.submission_status import SubmissionStatus
@@ -76,8 +77,8 @@ def test_submit_scenario_development_mode():
     assert job_1.submit_entity_id == scenario.id
     assert job_1.submit_entity_id == scenario.id
     assert job_1.creation_date == submit_time
     assert job_1.creation_date == submit_time
     assert job_1.stacktrace == []
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2 or t2_bis
     # t2 or t2_bis
     job_2 = jobs[1]
     job_2 = jobs[1]
@@ -87,8 +88,8 @@ def test_submit_scenario_development_mode():
     assert job_2.submit_entity_id == scenario.id
     assert job_2.submit_entity_id == scenario.id
     assert job_2.creation_date == submit_time
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_2._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_2._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2_bis or t2
     # t2_bis or t2
     job_2bis = jobs[2]
     job_2bis = jobs[2]
@@ -97,8 +98,8 @@ def test_submit_scenario_development_mode():
     assert not job_2bis.force
     assert not job_2bis.force
     assert job_2bis.submit_entity_id == scenario.id
     assert job_2bis.submit_entity_id == scenario.id
     assert job_2bis.creation_date == submit_time
     assert job_2bis.creation_date == submit_time
-    assert len(job_2bis._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_2bis._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_2bis._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_2bis._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis.stacktrace == []
     assert job_2bis.stacktrace == []
     # t3
     # t3
@@ -108,8 +109,8 @@ def test_submit_scenario_development_mode():
     assert job_3.is_completed()
     assert job_3.is_completed()
     assert job_3.submit_entity_id == scenario.id
     assert job_3.submit_entity_id == scenario.id
     assert job_3.creation_date == submit_time
     assert job_3.creation_date == submit_time
-    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_3._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_3._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3.stacktrace == []
     assert job_3.stacktrace == []
 
 
@@ -157,8 +158,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_1.submit_entity_id == scenario.id
     assert job_1.submit_entity_id == scenario.id
     assert job_1.creation_date == s_time
     assert job_1.creation_date == s_time
     assert job_1.stacktrace == []
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2 or t2_bis
     # t2 or t2_bis
     job_2 = jobs[1]
     job_2 = jobs[1]
@@ -168,8 +169,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_2.submit_entity_id == scenario.id
     assert job_2.submit_entity_id == scenario.id
     assert job_2.creation_date == s_time
     assert job_2.creation_date == s_time
     assert job_2.stacktrace == []
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_2._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_2._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2_bis or t2
     # t2_bis or t2
     job_2bis = jobs[2]
     job_2bis = jobs[2]
@@ -178,8 +179,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_2bis.submit_entity_id == scenario.id
     assert job_2bis.submit_entity_id == scenario.id
     assert not job_2bis.force
     assert not job_2bis.force
     assert job_2bis.creation_date == s_time
     assert job_2bis.creation_date == s_time
-    assert len(job_2bis._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_2bis._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_2bis._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_2bis._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis.stacktrace == []
     assert job_2bis.stacktrace == []
     # t3
     # t3
@@ -190,8 +191,8 @@ def test_submit_scenario_development_mode_blocked_jobs():
     assert job_3.submit_entity_id == scenario.id
     assert job_3.submit_entity_id == scenario.id
     assert job_3.creation_date == s_time
     assert job_3.creation_date == s_time
     assert job_3.stacktrace == []
     assert job_3.stacktrace == []
-    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_3._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_3._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
 
 
     # Same submit_id
     # Same submit_id
@@ -240,8 +241,8 @@ def test_submit_scenario_standalone_mode():
     assert job_1.is_pending()
     assert job_1.is_pending()
     assert job_1.creation_date == submit_time
     assert job_1.creation_date == submit_time
     assert job_1.submit_entity_id == sc.id
     assert job_1.submit_entity_id == sc.id
-    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_1.stacktrace == []
     assert job_1.stacktrace == []
     # t2 or t2_bis
     # t2 or t2_bis
@@ -252,18 +253,18 @@ def test_submit_scenario_standalone_mode():
     assert job_2.submit_entity_id == sc.id
     assert job_2.submit_entity_id == sc.id
     assert job_2.creation_date == submit_time
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert len(job_2._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
-    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert job_2._subscribers[0].__code__ == Job._update_submission_status.__code__
     # t2_bis or t2
     # t2_bis or t2
     job_2bis = jobs[2]
     job_2bis = jobs[2]
     assert job_2bis.task == sc.t_2bis or job_2bis.task == sc.t_2
     assert job_2bis.task == sc.t_2bis or job_2bis.task == sc.t_2
     assert job_2bis.is_blocked()
     assert job_2bis.is_blocked()
     assert not job_2bis.force
     assert not job_2bis.force
     assert job_2bis.submit_entity_id == sc.id
     assert job_2bis.submit_entity_id == sc.id
-    assert len(job_2bis._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert len(job_2bis._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2bis._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
-    assert job_2bis._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert job_2bis._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_2bis.creation_date == submit_time
     assert job_2bis.creation_date == submit_time
     assert job_2bis.stacktrace == []
     assert job_2bis.stacktrace == []
     # t3
     # t3
@@ -272,9 +273,9 @@ def test_submit_scenario_standalone_mode():
     assert not job_3.force
     assert not job_3.force
     assert job_3.is_blocked()
     assert job_3.is_blocked()
     assert job_3.submit_entity_id == sc.id
     assert job_3.submit_entity_id == sc.id
-    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert len(job_3._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
-    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert job_3._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_3.creation_date == submit_time
     assert job_3.creation_date == submit_time
     assert job_3.stacktrace == []
     assert job_3.stacktrace == []
 
 
@@ -309,15 +310,15 @@ def test_submit_scenario_with_callbacks_and_force_and_wait():
         assert len(jobs) == 4
         assert len(jobs) == 4
         assert len(jobs[0]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert len(jobs[0]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert jobs[0]._subscribers[0].__code__ == nothing.__code__
         assert jobs[0]._subscribers[0].__code__ == nothing.__code__
-        assert jobs[0]._subscribers[1].__code__ == _Orchestrator._update_submission_status.__code__
+        assert jobs[0]._subscribers[1].__code__ == Job._update_submission_status.__code__
         assert jobs[0]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert jobs[0]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert len(jobs[1]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert len(jobs[1]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert jobs[1]._subscribers[0].__code__ == nothing.__code__
         assert jobs[1]._subscribers[0].__code__ == nothing.__code__
-        assert jobs[1]._subscribers[1].__code__ == _Orchestrator._update_submission_status.__code__
+        assert jobs[1]._subscribers[1].__code__ == Job._update_submission_status.__code__
         assert jobs[1]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert jobs[1]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert len(jobs[2]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert len(jobs[2]._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
         assert jobs[2]._subscribers[0].__code__ == nothing.__code__
         assert jobs[2]._subscribers[0].__code__ == nothing.__code__
-        assert jobs[2]._subscribers[1].__code__ == _Orchestrator._update_submission_status.__code__
+        assert jobs[2]._subscribers[1].__code__ == Job._update_submission_status.__code__
         assert jobs[2]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         assert jobs[2]._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
         mck.assert_called_once_with(jobs, timeout=5)
         mck.assert_called_once_with(jobs, timeout=5)
 
 
@@ -351,8 +352,8 @@ def test_submit_sequence_development_mode():
     assert job_1.submit_entity_id == seq.id
     assert job_1.submit_entity_id == seq.id
     assert job_1.creation_date == submit_time
     assert job_1.creation_date == submit_time
     assert job_1.stacktrace == []
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_1._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_1._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_1._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_1._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t2
     # t2
     job_2 = jobs[1]
     job_2 = jobs[1]
@@ -362,8 +363,8 @@ def test_submit_sequence_development_mode():
     assert job_2.submit_entity_id == seq.id
     assert job_2.submit_entity_id == seq.id
     assert job_2.creation_date == submit_time
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_2._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_2._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_2._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_2._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     # t3
     # t3
     job_3 = jobs[2]
     job_3 = jobs[2]
@@ -371,8 +372,8 @@ def test_submit_sequence_development_mode():
     assert not job_3.force
     assert not job_3.force
     assert job_3.is_completed()
     assert job_3.is_completed()
     assert job_3.submit_entity_id == seq.id
     assert job_3.submit_entity_id == seq.id
-    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
-    assert job_3._subscribers[0].__code__ == _Orchestrator._update_submission_status.__code__
+    assert len(job_3._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
+    assert job_3._subscribers[0].__code__ == Job._update_submission_status.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job_3.creation_date == submit_time
     assert job_3.creation_date == submit_time
     assert job_3.stacktrace == []
     assert job_3.stacktrace == []
@@ -426,7 +427,7 @@ def test_submit_sequence_standalone_mode():
     assert job_1.creation_date == submit_time
     assert job_1.creation_date == submit_time
     assert job_1.submit_entity_id == sequence.id
     assert job_1.submit_entity_id == sequence.id
     assert job_1.stacktrace == []
     assert job_1.stacktrace == []
-    assert len(job_1._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert len(job_1._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
     # t2
     # t2
     job_2 = jobs[1]
     job_2 = jobs[1]
     assert job_2.task == scenario.t_2
     assert job_2.task == scenario.t_2
@@ -435,7 +436,7 @@ def test_submit_sequence_standalone_mode():
     assert job_2.submit_entity_id == sequence.id
     assert job_2.submit_entity_id == sequence.id
     assert job_2.creation_date == submit_time
     assert job_2.creation_date == submit_time
     assert job_2.stacktrace == []
     assert job_2.stacktrace == []
-    assert len(job_2._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert len(job_2._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
     # t3
     # t3
     job_3 = jobs[2]
     job_3 = jobs[2]
     assert job_3.task == scenario.t_3
     assert job_3.task == scenario.t_3
@@ -443,7 +444,7 @@ def test_submit_sequence_standalone_mode():
     assert job_3.is_blocked()
     assert job_3.is_blocked()
     assert job_3.creation_date == submit_time
     assert job_3.creation_date == submit_time
     assert job_3.submit_entity_id == sequence.id
     assert job_3.submit_entity_id == sequence.id
-    assert len(job_3._subscribers) == 2  # _Orchestrator._update_submission_status and orchestrator._on_status_change
+    assert len(job_3._subscribers) == 2  # Job._update_submission_status and orchestrator._on_status_change
     assert job_3.stacktrace == []
     assert job_3.stacktrace == []
 
 
     assert job_1.submit_id == job_2.submit_id == job_3.submit_id
     assert job_1.submit_id == job_2.submit_id == job_3.submit_id

+ 19 - 5
tests/core/_orchestrator/test_orchestrator__submit_task.py

@@ -16,7 +16,9 @@ import pytest
 
 
 from taipy.config import Config
 from taipy.config import Config
 from taipy.core import taipy
 from taipy.core import taipy
+from taipy.core._orchestrator._orchestrator import _Orchestrator
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.job.job import Job
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.core.submission.submission_status import SubmissionStatus
 
 
@@ -62,7 +64,9 @@ def test_submit_task_development_mode():
     assert job.submit_entity_id == scenario.t1.id
     assert job.submit_entity_id == scenario.t1.id
     assert job.creation_date == submit_time
     assert job.creation_date == submit_time
     assert job.stacktrace == []
     assert job.stacktrace == []
-    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job._subscribers) == 2  # Job._update_submission_status and _Orchestrator._on_status_change
+    assert job._subscribers[0].__code__ == Job._update_submission_status.__code__
+    assert job._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
 
 
     # submission is created and correct
     # submission is created and correct
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
     assert len(_SubmissionManagerFactory._build_manager()._get_all()) == 1
@@ -100,7 +104,9 @@ def test_submit_task_development_mode_blocked_job():
     assert job.is_blocked()  # input data is not ready
     assert job.is_blocked()  # input data is not ready
     assert job.submit_entity_id == scenario.t2.id
     assert job.submit_entity_id == scenario.t2.id
     assert job.creation_date == submit_time
     assert job.creation_date == submit_time
-    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job._subscribers) == 2  # Job._update_submission_status and _Orchestrator._on_status_change
+    assert job._subscribers[0].__code__ == Job._update_submission_status.__code__
+    assert job._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job.stacktrace == []
     assert job.stacktrace == []
 
 
     # submission is created and correct
     # submission is created and correct
@@ -144,7 +150,9 @@ def test_submit_task_standalone_mode():
     assert not job.force
     assert not job.force
     assert job.is_pending()
     assert job.is_pending()
     assert job.submit_entity_id == sc.t1.id
     assert job.submit_entity_id == sc.t1.id
-    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job._subscribers) == 2  # Job._update_submission_status and _Orchestrator._on_status_change
+    assert job._subscribers[0].__code__ == Job._update_submission_status.__code__
+    assert job._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job.stacktrace == []
     assert job.stacktrace == []
 
 
     # submission is created and correct
     # submission is created and correct
@@ -188,7 +196,9 @@ def test_submit_task_standalone_mode_blocked_job():
     assert not job.force
     assert not job.force
     assert job.is_blocked()  # input data is not ready
     assert job.is_blocked()  # input data is not ready
     assert job.stacktrace == []
     assert job.stacktrace == []
-    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert len(job._subscribers) == 2  # Job._update_submission_status and _Orchestrator._on_status_change
+    assert job._subscribers[0].__code__ == Job._update_submission_status.__code__
+    assert job._subscribers[1].__code__ == _Orchestrator._on_status_change.__code__
     assert job.submit_entity_id == sc.t2.id
     assert job.submit_entity_id == sc.t2.id
 
 
     # submission is created and correct
     # submission is created and correct
@@ -219,5 +229,9 @@ def test_submit_task_with_callbacks_and_force_and_wait():
         # job exists and is correct
         # job exists and is correct
         assert job.task == scenario.t1
         assert job.task == scenario.t1
         assert job.force
         assert job.force
-        assert len(job._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
+        assert len(job._subscribers) == 3  # nothing, Job._update_submission_status, and _Orchestrator._on_status_change
+        assert job._subscribers[0].__code__ == nothing.__code__
+        assert job._subscribers[1].__code__ == Job._update_submission_status.__code__
+        assert job._subscribers[2].__code__ == _Orchestrator._on_status_change.__code__
+
         mck.assert_called_once_with(job, timeout=2)
         mck.assert_called_once_with(job, timeout=2)

+ 62 - 90
tests/core/data/test_data_node.py

@@ -498,6 +498,57 @@ class TestDataNode:
         assert dn_1.validity_period == time_period_2
         assert dn_1.validity_period == time_period_2
         assert dn_2.validity_period == time_period_2
         assert dn_2.validity_period == time_period_2
 
 
+        dn_1.last_edit_date = new_datetime
+
+        assert len(dn_1.job_ids) == 1
+        assert len(dn_2.job_ids) == 1
+
+        with dn_1 as dn:
+            assert dn.config_id == "foo"
+            assert dn.owner_id is None
+            assert dn.scope == Scope.SCENARIO
+            assert dn.last_edit_date == new_datetime
+            assert dn.name == "def"
+            assert dn.edit_in_progress
+            assert dn.validity_period == time_period_2
+            assert len(dn.job_ids) == 1
+            assert dn._is_in_context
+
+            new_datetime_2 = new_datetime + timedelta(5)
+
+            dn.scope = Scope.CYCLE
+            dn.last_edit_date = new_datetime_2
+            dn.name = "abc"
+            dn.edit_in_progress = False
+            dn.validity_period = None
+
+            assert dn.config_id == "foo"
+            assert dn.owner_id is None
+            assert dn.scope == Scope.SCENARIO
+            assert dn.last_edit_date == new_datetime
+            assert dn.name == "def"
+            assert dn.edit_in_progress
+            assert dn.validity_period == time_period_2
+            assert len(dn.job_ids) == 1
+
+        assert dn_1.config_id == "foo"
+        assert dn_1.owner_id is None
+        assert dn_1.scope == Scope.CYCLE
+        assert dn_1.last_edit_date == new_datetime_2
+        assert dn_1.name == "abc"
+        assert not dn_1.edit_in_progress
+        assert dn_1.validity_period is None
+        assert not dn_1._is_in_context
+        assert len(dn_1.job_ids) == 1
+
+    def test_auto_set_and_reload_properties(self):
+        dn_1 = InMemoryDataNode("foo", scope=Scope.GLOBAL, properties={"name": "def"})
+
+        dm = _DataManager()
+        dm._set(dn_1)
+
+        dn_2 = dm._get(dn_1)
+
         # auto set & reload on properties attribute
         # auto set & reload on properties attribute
         assert dn_1.properties == {"name": "def"}
         assert dn_1.properties == {"name": "def"}
         assert dn_2.properties == {"name": "def"}
         assert dn_2.properties == {"name": "def"}
@@ -519,100 +570,37 @@ class TestDataNode:
             "temp_key_1": "temp_value_1",
             "temp_key_1": "temp_value_1",
             "temp_key_2": "temp_value_2",
             "temp_key_2": "temp_value_2",
         }
         }
-        assert dn_2.properties == {
-            "name": "def",
-            "qux": 5,
-            "temp_key_1": "temp_value_1",
-            "temp_key_2": "temp_value_2",
-        }
+        assert dn_2.properties == {"name": "def", "qux": 5, "temp_key_1": "temp_value_1", "temp_key_2": "temp_value_2"}
         dn_1.properties.pop("temp_key_1")
         dn_1.properties.pop("temp_key_1")
         assert "temp_key_1" not in dn_1.properties.keys()
         assert "temp_key_1" not in dn_1.properties.keys()
         assert "temp_key_1" not in dn_1.properties.keys()
         assert "temp_key_1" not in dn_1.properties.keys()
-        assert dn_1.properties == {
-            "name": "def",
-            "qux": 5,
-            "temp_key_2": "temp_value_2",
-        }
-        assert dn_2.properties == {
-            "name": "def",
-            "qux": 5,
-            "temp_key_2": "temp_value_2",
-        }
+        assert dn_1.properties == {"name": "def", "qux": 5, "temp_key_2": "temp_value_2"}
+        assert dn_2.properties == {"name": "def", "qux": 5, "temp_key_2": "temp_value_2"}
         dn_2.properties.pop("temp_key_2")
         dn_2.properties.pop("temp_key_2")
-        assert dn_1.properties == {
-            "qux": 5,
-            "name": "def",
-        }
-        assert dn_2.properties == {
-            "qux": 5,
-            "name": "def",
-        }
+        assert dn_1.properties == {"qux": 5, "name": "def"}
+        assert dn_2.properties == {"qux": 5, "name": "def"}
         assert "temp_key_2" not in dn_1.properties.keys()
         assert "temp_key_2" not in dn_1.properties.keys()
         assert "temp_key_2" not in dn_2.properties.keys()
         assert "temp_key_2" not in dn_2.properties.keys()
 
 
         dn_1.properties["temp_key_3"] = 0
         dn_1.properties["temp_key_3"] = 0
-        assert dn_1.properties == {
-            "qux": 5,
-            "temp_key_3": 0,
-            "name": "def",
-        }
-        assert dn_2.properties == {
-            "qux": 5,
-            "temp_key_3": 0,
-            "name": "def",
-        }
+        assert dn_1.properties == {"qux": 5, "temp_key_3": 0, "name": "def"}
+        assert dn_2.properties == {"qux": 5, "temp_key_3": 0, "name": "def"}
         dn_1.properties.update({"temp_key_3": 1})
         dn_1.properties.update({"temp_key_3": 1})
-        assert dn_1.properties == {
-            "qux": 5,
-            "temp_key_3": 1,
-            "name": "def",
-        }
-        assert dn_2.properties == {
-            "qux": 5,
-            "temp_key_3": 1,
-            "name": "def",
-        }
+        assert dn_1.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
+        assert dn_2.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
         dn_1.properties.update(dict())
         dn_1.properties.update(dict())
-        assert dn_1.properties == {
-            "qux": 5,
-            "temp_key_3": 1,
-            "name": "def",
-        }
-        assert dn_2.properties == {
-            "qux": 5,
-            "temp_key_3": 1,
-            "name": "def",
-        }
+        assert dn_1.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
+        assert dn_2.properties == {"qux": 5, "temp_key_3": 1, "name": "def"}
         dn_1.properties["temp_key_4"] = 0
         dn_1.properties["temp_key_4"] = 0
         dn_1.properties["temp_key_5"] = 0
         dn_1.properties["temp_key_5"] = 0
 
 
-        dn_1.last_edit_date = new_datetime
-
-        assert len(dn_1.job_ids) == 1
-        assert len(dn_2.job_ids) == 1
-
         with dn_1 as dn:
         with dn_1 as dn:
-            assert dn.config_id == "foo"
-            assert dn.owner_id is None
-            assert dn.scope == Scope.SCENARIO
-            assert dn.last_edit_date == new_datetime
-            assert dn.name == "def"
-            assert dn.edit_in_progress
-            assert dn.validity_period == time_period_2
-            assert len(dn.job_ids) == 1
             assert dn._is_in_context
             assert dn._is_in_context
             assert dn.properties["qux"] == 5
             assert dn.properties["qux"] == 5
             assert dn.properties["temp_key_3"] == 1
             assert dn.properties["temp_key_3"] == 1
             assert dn.properties["temp_key_4"] == 0
             assert dn.properties["temp_key_4"] == 0
             assert dn.properties["temp_key_5"] == 0
             assert dn.properties["temp_key_5"] == 0
 
 
-            new_datetime_2 = new_datetime + timedelta(5)
-
-            dn.scope = Scope.CYCLE
-            dn.last_edit_date = new_datetime_2
-            dn.name = "abc"
-            dn.edit_in_progress = False
-            dn.validity_period = None
             dn.properties["qux"] = 9
             dn.properties["qux"] = 9
             dn.properties.pop("temp_key_3")
             dn.properties.pop("temp_key_3")
             dn.properties.pop("temp_key_4")
             dn.properties.pop("temp_key_4")
@@ -621,28 +609,12 @@ class TestDataNode:
             dn.properties.pop("temp_key_5")
             dn.properties.pop("temp_key_5")
             dn.properties.update(dict())
             dn.properties.update(dict())
 
 
-            assert dn.config_id == "foo"
-            assert dn.owner_id is None
-            assert dn.scope == Scope.SCENARIO
-            assert dn.last_edit_date == new_datetime
-            assert dn.name == "def"
-            assert dn.edit_in_progress
-            assert dn.validity_period == time_period_2
-            assert len(dn.job_ids) == 1
             assert dn.properties["qux"] == 5
             assert dn.properties["qux"] == 5
             assert dn.properties["temp_key_3"] == 1
             assert dn.properties["temp_key_3"] == 1
             assert dn.properties["temp_key_4"] == 0
             assert dn.properties["temp_key_4"] == 0
             assert dn.properties["temp_key_5"] == 0
             assert dn.properties["temp_key_5"] == 0
 
 
-        assert dn_1.config_id == "foo"
-        assert dn_1.owner_id is None
-        assert dn_1.scope == Scope.CYCLE
-        assert dn_1.last_edit_date == new_datetime_2
-        assert dn_1.name == "abc"
-        assert not dn_1.edit_in_progress
-        assert dn_1.validity_period is None
         assert not dn_1._is_in_context
         assert not dn_1._is_in_context
-        assert len(dn_1.job_ids) == 1
         assert dn_1.properties["qux"] == 9
         assert dn_1.properties["qux"] == 9
         assert "temp_key_3" not in dn_1.properties.keys()
         assert "temp_key_3" not in dn_1.properties.keys()
         assert dn_1.properties["temp_key_4"] == 1
         assert dn_1.properties["temp_key_4"] == 1

+ 4 - 3
tests/core/notification/test_events_published.py

@@ -174,16 +174,16 @@ def test_events_published_for_scenario_submission():
     # 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
     # 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
     scenario.submit()
     scenario.submit()
     snapshot = all_evts.capture()
     snapshot = all_evts.capture()
-    assert len(snapshot.collected_events) == 17
+    assert len(snapshot.collected_events) == 18
     assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 7
     assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 7
     assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
     assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
     assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
     assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
-    assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 5
+    assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 6
     assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
     assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
-    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 14
+    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 15
     assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1
     assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1
 
 
     assert snapshot.attr_name_collected["last_edit_date"] == 1
     assert snapshot.attr_name_collected["last_edit_date"] == 1
@@ -193,6 +193,7 @@ def test_events_published_for_scenario_submission():
     assert snapshot.attr_name_collected["status"] == 3
     assert snapshot.attr_name_collected["status"] == 3
     assert snapshot.attr_name_collected["jobs"] == 1
     assert snapshot.attr_name_collected["jobs"] == 1
     assert snapshot.attr_name_collected["submission_status"] == 3
     assert snapshot.attr_name_collected["submission_status"] == 3
+    assert snapshot.attr_name_collected["is_completed"] == 1
 
 
     all_evts.stop()
     all_evts.stop()
 
 

+ 69 - 49
tests/core/scenario/test_scenario.py

@@ -549,6 +549,75 @@ def test_auto_set_and_reload(cycle, current_datetime, task, data_node):
     assert len(scenario_1.tags) == 1
     assert len(scenario_1.tags) == 1
     assert len(scenario_2.tags) == 1
     assert len(scenario_2.tags) == 1
 
 
+    with scenario_1 as scenario:
+        assert scenario.config_id == "foo"
+        assert len(scenario.tasks) == 1
+        assert len(scenario.sequences) == 1
+        assert scenario.sequences["sequence_1"] == sequence_1
+        assert scenario.tasks[task.config_id] == task
+        assert len(scenario.additional_data_nodes) == 1
+        assert scenario.additional_data_nodes[additional_dn.config_id] == additional_dn
+        assert scenario.creation_date == new_datetime
+        assert scenario.cycle == cycle
+        assert scenario.is_primary
+        assert len(scenario.subscribers) == 0
+        assert len(scenario.tags) == 1
+        assert scenario._is_in_context
+        assert scenario.name == "baz"
+
+        new_datetime_2 = new_datetime + timedelta(5)
+        scenario._config_id = "foo"
+        scenario.tasks = set()
+        scenario.additional_data_nodes = set()
+        scenario.remove_sequences([sequence_1_name])
+        scenario.creation_date = new_datetime_2
+        scenario.cycle = None
+        scenario.is_primary = False
+        scenario.subscribers = [print]
+        scenario.tags = None
+        scenario.name = "qux"
+
+        assert scenario.config_id == "foo"
+        assert len(scenario.sequences) == 1
+        assert scenario.sequences[sequence_1_name] == sequence_1
+        assert len(scenario.tasks) == 1
+        assert scenario.tasks[task.config_id] == task
+        assert len(scenario.additional_data_nodes) == 1
+        assert scenario.additional_data_nodes[additional_dn.config_id] == additional_dn
+        assert scenario.creation_date == new_datetime
+        assert scenario.cycle == cycle
+        assert scenario.is_primary
+        assert len(scenario.subscribers) == 0
+        assert len(scenario.tags) == 1
+        assert scenario._is_in_context
+        assert scenario.name == "baz"
+
+    assert scenario_1.config_id == "foo"
+    assert len(scenario_1.sequences) == 0
+    assert len(scenario_1.tasks) == 0
+    assert len(scenario_1.additional_data_nodes) == 0
+    assert scenario_1.tasks == {}
+    assert scenario_1.additional_data_nodes == {}
+    assert scenario_1.creation_date == new_datetime_2
+    assert scenario_1.cycle is None
+    assert not scenario_1.is_primary
+    assert len(scenario_1.subscribers) == 1
+    assert len(scenario_1.tags) == 0
+    assert not scenario_1._is_in_context
+
+
+def test_auto_set_and_reload_properties():
+    scenario_1 = Scenario(
+        "foo",
+        set(),
+        {"name": "baz"},
+    )
+
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    scenario_manager._set(scenario_1)
+
+    scenario_2 = scenario_manager._get(scenario_1)
+
     # auto set & reload on properties attribute
     # auto set & reload on properties attribute
     assert scenario_1.properties == {"name": "baz"}
     assert scenario_1.properties == {"name": "baz"}
     assert scenario_2.properties == {"name": "baz"}
     assert scenario_2.properties == {"name": "baz"}
@@ -608,36 +677,11 @@ def test_auto_set_and_reload(cycle, current_datetime, task, data_node):
     scenario_1.properties["temp_key_5"] = 0
     scenario_1.properties["temp_key_5"] = 0
 
 
     with scenario_1 as scenario:
     with scenario_1 as scenario:
-        assert scenario.config_id == "foo"
-        assert len(scenario.tasks) == 1
-        assert len(scenario.sequences) == 1
-        assert scenario.sequences["sequence_1"] == sequence_1
-        assert scenario.tasks[task.config_id] == task
-        assert len(scenario.additional_data_nodes) == 1
-        assert scenario.additional_data_nodes[additional_dn.config_id] == additional_dn
-        assert scenario.creation_date == new_datetime
-        assert scenario.cycle == cycle
-        assert scenario.is_primary
-        assert len(scenario.subscribers) == 0
-        assert len(scenario.tags) == 1
-        assert scenario._is_in_context
-        assert scenario.name == "baz"
         assert scenario.properties["qux"] == 5
         assert scenario.properties["qux"] == 5
         assert scenario.properties["temp_key_3"] == 1
         assert scenario.properties["temp_key_3"] == 1
         assert scenario.properties["temp_key_4"] == 0
         assert scenario.properties["temp_key_4"] == 0
         assert scenario.properties["temp_key_5"] == 0
         assert scenario.properties["temp_key_5"] == 0
 
 
-        new_datetime_2 = new_datetime + timedelta(5)
-        scenario._config_id = "foo"
-        scenario.tasks = set()
-        scenario.additional_data_nodes = set()
-        scenario.remove_sequences([sequence_1_name])
-        scenario.creation_date = new_datetime_2
-        scenario.cycle = None
-        scenario.is_primary = False
-        scenario.subscribers = [print]
-        scenario.tags = None
-        scenario.name = "qux"
         scenario.properties["qux"] = 9
         scenario.properties["qux"] = 9
         scenario.properties.pop("temp_key_3")
         scenario.properties.pop("temp_key_3")
         scenario.properties.pop("temp_key_4")
         scenario.properties.pop("temp_key_4")
@@ -646,36 +690,12 @@ def test_auto_set_and_reload(cycle, current_datetime, task, data_node):
         scenario.properties.pop("temp_key_5")
         scenario.properties.pop("temp_key_5")
         scenario.properties.update(dict())
         scenario.properties.update(dict())
 
 
-        assert scenario.config_id == "foo"
-        assert len(scenario.sequences) == 1
-        assert scenario.sequences[sequence_1_name] == sequence_1
-        assert len(scenario.tasks) == 1
-        assert scenario.tasks[task.config_id] == task
-        assert len(scenario.additional_data_nodes) == 1
-        assert scenario.additional_data_nodes[additional_dn.config_id] == additional_dn
-        assert scenario.creation_date == new_datetime
-        assert scenario.cycle == cycle
-        assert scenario.is_primary
-        assert len(scenario.subscribers) == 0
-        assert len(scenario.tags) == 1
         assert scenario._is_in_context
         assert scenario._is_in_context
-        assert scenario.name == "baz"
         assert scenario.properties["qux"] == 5
         assert scenario.properties["qux"] == 5
         assert scenario.properties["temp_key_3"] == 1
         assert scenario.properties["temp_key_3"] == 1
         assert scenario.properties["temp_key_4"] == 0
         assert scenario.properties["temp_key_4"] == 0
         assert scenario.properties["temp_key_5"] == 0
         assert scenario.properties["temp_key_5"] == 0
 
 
-    assert scenario_1.config_id == "foo"
-    assert len(scenario_1.sequences) == 0
-    assert len(scenario_1.tasks) == 0
-    assert len(scenario_1.additional_data_nodes) == 0
-    assert scenario_1.tasks == {}
-    assert scenario_1.additional_data_nodes == {}
-    assert scenario_1.creation_date == new_datetime_2
-    assert scenario_1.cycle is None
-    assert not scenario_1.is_primary
-    assert len(scenario_1.subscribers) == 1
-    assert len(scenario_1.tags) == 0
     assert not scenario_1._is_in_context
     assert not scenario_1._is_in_context
     assert scenario_1.properties["qux"] == 9
     assert scenario_1.properties["qux"] == 9
     assert "temp_key_3" not in scenario_1.properties.keys()
     assert "temp_key_3" not in scenario_1.properties.keys()

+ 26 - 10
tests/core/sequence/test_sequence.py

@@ -582,6 +582,32 @@ def test_auto_set_and_reload(task):
     assert len(sequence_1.subscribers) == 0
     assert len(sequence_1.subscribers) == 0
     assert len(sequence_2.subscribers) == 0
     assert len(sequence_2.subscribers) == 0
 
 
+    with sequence_1 as sequence:
+        assert len(sequence.tasks) == 1
+        assert sequence.tasks[task.config_id].id == task.id
+        assert len(sequence.subscribers) == 0
+        assert sequence._is_in_context
+
+        sequence.tasks = []
+        sequence.subscribers = [print]
+        assert len(sequence.tasks) == 1
+        assert sequence.tasks[task.config_id].id == task.id
+        assert len(sequence.subscribers) == 0
+        assert sequence._is_in_context
+
+    assert len(sequence_1.tasks) == 0
+    assert len(sequence_1.subscribers) == 1
+    assert not sequence_1._is_in_context
+
+
+def test_auto_set_and_reload_properties():
+    scenario = Scenario("scenario", [], {}, sequences={"foo": {}})
+
+    _ScenarioManager._set(scenario)
+
+    sequence_1 = scenario.sequences["foo"]
+    sequence_2 = _SequenceManager._get(sequence_1)
+
     # auto set & reload on properties attribute
     # auto set & reload on properties attribute
     assert sequence_1.properties == {"name": "foo"}
     assert sequence_1.properties == {"name": "foo"}
     assert sequence_2.properties == {"name": "foo"}
     assert sequence_2.properties == {"name": "foo"}
@@ -638,17 +664,12 @@ def test_auto_set_and_reload(task):
     sequence_1.properties["temp_key_5"] = 0
     sequence_1.properties["temp_key_5"] = 0
 
 
     with sequence_1 as sequence:
     with sequence_1 as sequence:
-        assert len(sequence.tasks) == 1
-        assert sequence.tasks[task.config_id].id == task.id
-        assert len(sequence.subscribers) == 0
         assert sequence._is_in_context
         assert sequence._is_in_context
         assert sequence.properties["qux"] == 5
         assert sequence.properties["qux"] == 5
         assert sequence.properties["temp_key_3"] == 1
         assert sequence.properties["temp_key_3"] == 1
         assert sequence.properties["temp_key_4"] == 0
         assert sequence.properties["temp_key_4"] == 0
         assert sequence.properties["temp_key_5"] == 0
         assert sequence.properties["temp_key_5"] == 0
 
 
-        sequence.tasks = []
-        sequence.subscribers = [print]
         sequence.properties["qux"] = 9
         sequence.properties["qux"] = 9
         sequence.properties.pop("temp_key_3")
         sequence.properties.pop("temp_key_3")
         sequence.properties.pop("temp_key_4")
         sequence.properties.pop("temp_key_4")
@@ -657,17 +678,12 @@ def test_auto_set_and_reload(task):
         sequence.properties.pop("temp_key_5")
         sequence.properties.pop("temp_key_5")
         sequence.properties.update(dict())
         sequence.properties.update(dict())
 
 
-        assert len(sequence.tasks) == 1
-        assert sequence.tasks[task.config_id].id == task.id
-        assert len(sequence.subscribers) == 0
         assert sequence._is_in_context
         assert sequence._is_in_context
         assert sequence.properties["qux"] == 5
         assert sequence.properties["qux"] == 5
         assert sequence.properties["temp_key_3"] == 1
         assert sequence.properties["temp_key_3"] == 1
         assert sequence.properties["temp_key_4"] == 0
         assert sequence.properties["temp_key_4"] == 0
         assert sequence.properties["temp_key_5"] == 0
         assert sequence.properties["temp_key_5"] == 0
 
 
-    assert len(sequence_1.tasks) == 0
-    assert len(sequence_1.subscribers) == 1
     assert not sequence_1._is_in_context
     assert not sequence_1._is_in_context
     assert sequence_1.properties["qux"] == 9
     assert sequence_1.properties["qux"] == 9
     assert "temp_key_3" not in sequence_1.properties.keys()
     assert "temp_key_3" not in sequence_1.properties.keys()

+ 125 - 28
tests/core/submission/test_submission.py

@@ -297,6 +297,36 @@ def test_auto_set_and_reload():
     assert submission_1.jobs == [job_2, job_1]
     assert submission_1.jobs == [job_2, job_1]
     assert submission_2.jobs == [job_2, job_1]
     assert submission_2.jobs == [job_2, job_1]
 
 
+    # auto set & reload on is_canceled attribute
+    assert not submission_1.is_canceled
+    assert not submission_2.is_canceled
+    submission_1.is_canceled = True
+    assert submission_1.is_canceled
+    assert submission_2.is_canceled
+    submission_2.is_canceled = False
+    assert not submission_1.is_canceled
+    assert not submission_2.is_canceled
+
+    # auto set & reload on is_completed attribute
+    assert not submission_1.is_completed
+    assert not submission_2.is_completed
+    submission_1.is_completed = True
+    assert submission_1.is_completed
+    assert submission_2.is_completed
+    submission_2.is_completed = False
+    assert not submission_1.is_completed
+    assert not submission_2.is_completed
+
+    # auto set & reload on is_abandoned attribute
+    assert not submission_1.is_abandoned
+    assert not submission_2.is_abandoned
+    submission_1.is_abandoned = True
+    assert submission_1.is_abandoned
+    assert submission_2.is_abandoned
+    submission_2.is_abandoned = False
+    assert not submission_1.is_abandoned
+    assert not submission_2.is_abandoned
+
     # auto set & reload on submission_status attribute
     # auto set & reload on submission_status attribute
     assert submission_1.submission_status == SubmissionStatus.SUBMITTED
     assert submission_1.submission_status == SubmissionStatus.SUBMITTED
     assert submission_2.submission_status == SubmissionStatus.SUBMITTED
     assert submission_2.submission_status == SubmissionStatus.SUBMITTED
@@ -307,6 +337,97 @@ def test_auto_set_and_reload():
     assert submission_1.submission_status == SubmissionStatus.COMPLETED
     assert submission_1.submission_status == SubmissionStatus.COMPLETED
     assert submission_2.submission_status == SubmissionStatus.COMPLETED
     assert submission_2.submission_status == SubmissionStatus.COMPLETED
 
 
+    with submission_1 as submission:
+        assert submission.jobs == [job_2, job_1]
+        assert submission.submission_status == SubmissionStatus.COMPLETED
+
+        submission.jobs = [job_1]
+        submission.submission_status = SubmissionStatus.PENDING
+
+        assert submission.jobs == [job_2, job_1]
+        assert submission.submission_status == SubmissionStatus.COMPLETED
+
+    assert submission_1.jobs == [job_1]
+    assert submission_1.submission_status == SubmissionStatus.PENDING
+    assert submission_2.jobs == [job_1]
+    assert submission_2.submission_status == SubmissionStatus.PENDING
+
+
+def test_auto_set_and_reload_job_sets():
+    # pending_jobs, running_jobs, blocked_jobs have the same behavior
+    # so we will only test 1 attribute (pending_jobs) as representative
+
+    task = Task(config_id="name_1", properties={}, function=print, id=TaskId("task_1"))
+    submission_1 = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
+
+    _TaskManagerFactory._build_manager()._set(task)
+    _SubmissionManagerFactory._build_manager()._set(submission_1)
+
+    submission_2 = _SubmissionManagerFactory._build_manager()._get(submission_1)
+
+    # auto set & reload on pending_jobs attribute
+    assert len(submission_1.pending_jobs) == 0
+    assert len(submission_1.pending_jobs) == 0
+    submission_1.pending_jobs.add("job_id_1")
+    assert submission_1.pending_jobs.data == set(["job_id_1"])
+    assert submission_2.pending_jobs.data == set(["job_id_1"])
+
+    assert len(submission_1.pending_jobs) == 1
+    assert len(submission_1.pending_jobs) == 1
+    submission_2.pending_jobs.add("job_id_2")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2"])
+
+    submission_1.pending_jobs.add("job_id_tmp_1")
+    submission_2.pending_jobs.add("job_id_tmp_2")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_1", "job_id_tmp_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_1", "job_id_tmp_2"])
+
+    submission_1.pending_jobs.remove("job_id_tmp_1")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_2"])
+    submission_2.pending_jobs.remove("job_id_tmp_2")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2"])
+
+    submission_1.pending_jobs.add("job_id_tmp_1")
+    submission_2.pending_jobs.add("job_id_tmp_2")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_1", "job_id_tmp_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_1", "job_id_tmp_2"])
+
+    submission_1.pending_jobs.discard("job_id_tmp_1")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_2"])
+    submission_2.pending_jobs.discard("job_id_tmp_2")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2"])
+
+    submission_1.pending_jobs.add("job_id_tmp_1")
+    submission_2.pending_jobs.add("job_id_tmp_2")
+    assert submission_1.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_1", "job_id_tmp_2"])
+    assert submission_2.pending_jobs.data == set(["job_id_1", "job_id_2", "job_id_tmp_1", "job_id_tmp_2"])
+
+    submission_1.pending_jobs.pop()
+    assert len(submission_1.pending_jobs.data) == 3
+    assert len(submission_2.pending_jobs.data) == 3
+    submission_2.pending_jobs.pop()
+    assert len(submission_1.pending_jobs.data) == 2
+    assert len(submission_2.pending_jobs.data) == 2
+
+    submission_1.pending_jobs.clear()
+    assert len(submission_1.pending_jobs.data) == 0
+    assert len(submission_2.pending_jobs.data) == 0
+
+
+def test_auto_set_and_reload_properties():
+    task = Task(config_id="name_1", properties={}, function=print, id=TaskId("task_1"))
+    submission_1 = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
+
+    _TaskManagerFactory._build_manager()._set(task)
+    _SubmissionManagerFactory._build_manager()._set(submission_1)
+
+    submission_2 = _SubmissionManagerFactory._build_manager()._get(submission_1)
+
     # auto set & reload on properties attribute
     # auto set & reload on properties attribute
     assert submission_1.properties == {}
     assert submission_1.properties == {}
     assert submission_2.properties == {}
     assert submission_2.properties == {}
@@ -322,27 +443,13 @@ def test_auto_set_and_reload():
 
 
     submission_1.properties["temp_key_1"] = "temp_value_1"
     submission_1.properties["temp_key_1"] = "temp_value_1"
     submission_1.properties["temp_key_2"] = "temp_value_2"
     submission_1.properties["temp_key_2"] = "temp_value_2"
-    assert submission_1.properties == {
-        "qux": 5,
-        "temp_key_1": "temp_value_1",
-        "temp_key_2": "temp_value_2",
-    }
-    assert submission_2.properties == {
-        "qux": 5,
-        "temp_key_1": "temp_value_1",
-        "temp_key_2": "temp_value_2",
-    }
+    assert submission_1.properties == {"qux": 5, "temp_key_1": "temp_value_1", "temp_key_2": "temp_value_2"}
+    assert submission_2.properties == {"qux": 5, "temp_key_1": "temp_value_1", "temp_key_2": "temp_value_2"}
     submission_1.properties.pop("temp_key_1")
     submission_1.properties.pop("temp_key_1")
     assert "temp_key_1" not in submission_1.properties.keys()
     assert "temp_key_1" not in submission_1.properties.keys()
     assert "temp_key_1" not in submission_1.properties.keys()
     assert "temp_key_1" not in submission_1.properties.keys()
-    assert submission_1.properties == {
-        "qux": 5,
-        "temp_key_2": "temp_value_2",
-    }
-    assert submission_2.properties == {
-        "qux": 5,
-        "temp_key_2": "temp_value_2",
-    }
+    assert submission_1.properties == {"qux": 5, "temp_key_2": "temp_value_2"}
+    assert submission_2.properties == {"qux": 5, "temp_key_2": "temp_value_2"}
     submission_2.properties.pop("temp_key_2")
     submission_2.properties.pop("temp_key_2")
     assert submission_1.properties == {"qux": 5}
     assert submission_1.properties == {"qux": 5}
     assert submission_2.properties == {"qux": 5}
     assert submission_2.properties == {"qux": 5}
@@ -362,15 +469,11 @@ def test_auto_set_and_reload():
     submission_1.properties["temp_key_5"] = 0
     submission_1.properties["temp_key_5"] = 0
 
 
     with submission_1 as submission:
     with submission_1 as submission:
-        assert submission.jobs == [job_2, job_1]
-        assert submission.submission_status == SubmissionStatus.COMPLETED
         assert submission.properties["qux"] == 5
         assert submission.properties["qux"] == 5
         assert submission.properties["temp_key_3"] == 1
         assert submission.properties["temp_key_3"] == 1
         assert submission.properties["temp_key_4"] == 0
         assert submission.properties["temp_key_4"] == 0
         assert submission.properties["temp_key_5"] == 0
         assert submission.properties["temp_key_5"] == 0
 
 
-        submission.jobs = [job_1]
-        submission.submission_status = SubmissionStatus.PENDING
         submission.properties["qux"] = 9
         submission.properties["qux"] = 9
         submission.properties.pop("temp_key_3")
         submission.properties.pop("temp_key_3")
         submission.properties.pop("temp_key_4")
         submission.properties.pop("temp_key_4")
@@ -379,17 +482,11 @@ def test_auto_set_and_reload():
         submission.properties.pop("temp_key_5")
         submission.properties.pop("temp_key_5")
         submission.properties.update(dict())
         submission.properties.update(dict())
 
 
-        assert submission.jobs == [job_2, job_1]
-        assert submission.submission_status == SubmissionStatus.COMPLETED
         assert submission.properties["qux"] == 5
         assert submission.properties["qux"] == 5
         assert submission.properties["temp_key_3"] == 1
         assert submission.properties["temp_key_3"] == 1
         assert submission.properties["temp_key_4"] == 0
         assert submission.properties["temp_key_4"] == 0
         assert submission.properties["temp_key_5"] == 0
         assert submission.properties["temp_key_5"] == 0
 
 
-    assert submission_1.jobs == [job_1]
-    assert submission_1.submission_status == SubmissionStatus.PENDING
-    assert submission_2.jobs == [job_1]
-    assert submission_2.submission_status == SubmissionStatus.PENDING
     assert submission_1.properties["qux"] == 9
     assert submission_1.properties["qux"] == 9
     assert "temp_key_3" not in submission_1.properties.keys()
     assert "temp_key_3" not in submission_1.properties.keys()
     assert submission_1.properties["temp_key_4"] == 1
     assert submission_1.properties["temp_key_4"] == 1

+ 32 - 14
tests/core/task/test_task.py

@@ -196,6 +196,38 @@ def test_auto_set_and_reload(data_node):
     assert task_1.parent_ids == {"sc1"}
     assert task_1.parent_ids == {"sc1"}
     assert task_2.parent_ids == {"sc1"}
     assert task_2.parent_ids == {"sc1"}
 
 
+    with task_1 as task:
+        assert task.config_id == "foo"
+        assert task.owner_id is None
+        assert task.function == mock_func
+        assert not task.skippable
+        assert task._is_in_context
+
+        task.function = print
+        task.skippable = True
+
+        assert task.config_id == "foo"
+        assert task.owner_id is None
+        assert task.function == mock_func
+        assert not task.skippable
+        assert task._is_in_context
+
+    assert task_1.config_id == "foo"
+    assert task_1.owner_id is None
+    assert task_1.function == print
+    assert task.skippable
+    assert not task_1._is_in_context
+
+
+def test_auto_set_and_reload_properties():
+    task_1 = Task(
+        config_id="foo", properties={}, function=print, input=None, output=None, owner_id=None, skippable=False
+    )
+
+    _TaskManager._set(task_1)
+
+    task_2 = _TaskManager._get(task_1)
+
     # auto set & reload on properties attribute
     # auto set & reload on properties attribute
     assert task_1.properties == {}
     assert task_1.properties == {}
     assert task_2.properties == {}
     assert task_2.properties == {}
@@ -251,18 +283,12 @@ def test_auto_set_and_reload(data_node):
     task_1.properties["temp_key_5"] = 0
     task_1.properties["temp_key_5"] = 0
 
 
     with task_1 as task:
     with task_1 as task:
-        assert task.config_id == "foo"
-        assert task.owner_id is None
-        assert task.function == mock_func
-        assert not task.skippable
         assert task._is_in_context
         assert task._is_in_context
         assert task.properties["qux"] == 5
         assert task.properties["qux"] == 5
         assert task.properties["temp_key_3"] == 1
         assert task.properties["temp_key_3"] == 1
         assert task.properties["temp_key_4"] == 0
         assert task.properties["temp_key_4"] == 0
         assert task.properties["temp_key_5"] == 0
         assert task.properties["temp_key_5"] == 0
 
 
-        task.function = print
-        task.skippable = True
         task.properties["qux"] = 9
         task.properties["qux"] = 9
         task.properties.pop("temp_key_3")
         task.properties.pop("temp_key_3")
         task.properties.pop("temp_key_4")
         task.properties.pop("temp_key_4")
@@ -271,20 +297,12 @@ def test_auto_set_and_reload(data_node):
         task.properties.pop("temp_key_5")
         task.properties.pop("temp_key_5")
         task.properties.update(dict())
         task.properties.update(dict())
 
 
-        assert task.config_id == "foo"
-        assert task.owner_id is None
-        assert task.function == mock_func
-        assert not task.skippable
         assert task._is_in_context
         assert task._is_in_context
         assert task.properties["qux"] == 5
         assert task.properties["qux"] == 5
         assert task.properties["temp_key_3"] == 1
         assert task.properties["temp_key_3"] == 1
         assert task.properties["temp_key_4"] == 0
         assert task.properties["temp_key_4"] == 0
         assert task.properties["temp_key_5"] == 0
         assert task.properties["temp_key_5"] == 0
 
 
-    assert task_1.config_id == "foo"
-    assert task_1.owner_id is None
-    assert task_1.function == print
-    assert task.skippable
     assert not task_1._is_in_context
     assert not task_1._is_in_context
     assert task_1.properties["qux"] == 9
     assert task_1.properties["qux"] == 9
     assert "temp_key_3" not in task_1.properties.keys()
     assert "temp_key_3" not in task_1.properties.keys()