|
@@ -18,7 +18,6 @@ from functools import partial
|
|
|
from time import sleep
|
|
|
|
|
|
import pytest
|
|
|
-from tests.core.utils import assert_true_after_time
|
|
|
|
|
|
from taipy.config import Config
|
|
|
from taipy.config.common.scope import Scope
|
|
@@ -28,6 +27,7 @@ from taipy.core._orchestrator._orchestrator import _Orchestrator
|
|
|
from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
|
|
|
from taipy.core.config.job_config import JobConfig
|
|
|
from taipy.core.data._data_manager import _DataManager
|
|
|
+from taipy.core.data.pickle import PickleDataNode
|
|
|
from taipy.core.scenario._scenario_manager import _ScenarioManager
|
|
|
from taipy.core.scenario.scenario import Scenario
|
|
|
from taipy.core.sequence.sequence import Sequence
|
|
@@ -35,8 +35,7 @@ from taipy.core.submission._submission_manager import _SubmissionManager
|
|
|
from taipy.core.submission.submission_status import SubmissionStatus
|
|
|
from taipy.core.task._task_manager import _TaskManager
|
|
|
from taipy.core.task.task import Task
|
|
|
-from taipy.core.data.pickle import PickleDataNode
|
|
|
-
|
|
|
+from tests.core.utils import assert_true_after_time
|
|
|
|
|
|
# ################################ USER FUNCTIONS ##################################
|
|
|
|
|
@@ -261,19 +260,19 @@ def test_scenario_only_submit_same_task_once():
|
|
|
|
|
|
jobs = _Orchestrator.submit(scenario_1)
|
|
|
assert len(jobs) == 3
|
|
|
- assert all([job.is_completed() for job in jobs])
|
|
|
+ assert all(job.is_completed() for job in jobs)
|
|
|
assert all(not _Orchestrator._is_blocked(job) for job in jobs)
|
|
|
assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
|
|
|
|
|
|
jobs = _Orchestrator.submit(sequence_1)
|
|
|
assert len(jobs) == 2
|
|
|
- assert all([job.is_completed() for job in jobs])
|
|
|
+ assert all(job.is_completed() for job in jobs)
|
|
|
assert all(not _Orchestrator._is_blocked(job) for job in jobs)
|
|
|
assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
|
|
|
|
|
|
jobs = _Orchestrator.submit(sequence_2)
|
|
|
assert len(jobs) == 2
|
|
|
- assert all([job.is_completed() for job in jobs])
|
|
|
+ assert all(job.is_completed() for job in jobs)
|
|
|
assert all(not _Orchestrator._is_blocked(job) for job in jobs)
|
|
|
assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
|
|
|
|
|
@@ -309,7 +308,7 @@ def test_update_status_fail_job():
|
|
|
jobs = _Orchestrator.submit(scenario_1)
|
|
|
tasks_jobs = {job._task.id: job for job in jobs}
|
|
|
assert tasks_jobs["task_0"].is_failed()
|
|
|
- assert all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])
|
|
|
+ assert all(job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]])
|
|
|
assert tasks_jobs["task_3"].is_completed()
|
|
|
assert all(not _Orchestrator._is_blocked(job) for job in jobs)
|
|
|
assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
|
|
@@ -317,7 +316,7 @@ def test_update_status_fail_job():
|
|
|
jobs = _Orchestrator.submit(scenario_2)
|
|
|
tasks_jobs = {job._task.id: job for job in jobs}
|
|
|
assert tasks_jobs["task_0"].is_failed()
|
|
|
- assert all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])
|
|
|
+ assert all(job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]])
|
|
|
assert tasks_jobs["task_3"].is_completed()
|
|
|
assert all(not _Orchestrator._is_blocked(job) for job in jobs)
|
|
|
assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
|
|
@@ -368,7 +367,7 @@ def test_update_status_fail_job_in_parallel_one_sequence():
|
|
|
|
|
|
tasks_jobs = {job._task.id: job for job in jobs}
|
|
|
assert_true_after_time(tasks_jobs["task_0"].is_failed)
|
|
|
- assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
|
|
|
+ assert_true_after_time(lambda: all(job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]))
|
|
|
assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
|
|
|
submit_id = jobs[0].submit_id
|
|
|
submission = _SubmissionManager._get(submit_id)
|
|
@@ -402,7 +401,7 @@ def test_update_status_fail_job_in_parallel_one_scenario():
|
|
|
tasks_jobs = {job._task.id: job for job in jobs}
|
|
|
assert_true_after_time(tasks_jobs["task_0"].is_failed)
|
|
|
assert_true_after_time(tasks_jobs["task_3"].is_completed)
|
|
|
- assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
|
|
|
+ assert_true_after_time(lambda: all(job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]))
|
|
|
assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
|
|
|
submit_id = jobs[0].submit_id
|
|
|
submission = _SubmissionManager._get(submit_id)
|