|
@@ -81,23 +81,22 @@ class _Orchestrator(_AbstractOrchestrator):
|
|
|
tasks = submittable._get_sorted_tasks()
|
|
|
with cls.lock:
|
|
|
for ts in tasks:
|
|
|
- for task in ts:
|
|
|
- jobs.append(
|
|
|
- cls._lock_dn_output_and_create_job(
|
|
|
- task,
|
|
|
- submission.id,
|
|
|
- submission.entity_id,
|
|
|
- callbacks=itertools.chain([cls._update_submission_status], callbacks or []),
|
|
|
- force=force, # type: ignore
|
|
|
- )
|
|
|
+ jobs.extend(
|
|
|
+ cls._lock_dn_output_and_create_job(
|
|
|
+ task,
|
|
|
+ submission.id,
|
|
|
+ submission.entity_id,
|
|
|
+ callbacks=itertools.chain([cls._update_submission_status], callbacks or []),
|
|
|
+ force=force, # type: ignore
|
|
|
)
|
|
|
+ for task in ts
|
|
|
+ )
|
|
|
submission.jobs = jobs # type: ignore
|
|
|
cls._orchestrate_job_to_run_or_block(jobs)
|
|
|
if Config.job_config.is_development:
|
|
|
cls._check_and_execute_jobs_if_development_mode()
|
|
|
- else:
|
|
|
- if wait:
|
|
|
- cls._wait_until_job_finished(jobs, timeout=timeout or 0)
|
|
|
+ elif wait:
|
|
|
+ cls._wait_until_job_finished(jobs, timeout=timeout or 0)
|
|
|
return submission
|
|
|
|
|
|
@classmethod
|
|
@@ -158,11 +157,14 @@ class _Orchestrator(_AbstractOrchestrator):
|
|
|
) -> Job:
|
|
|
for dn in task.output.values():
|
|
|
dn.lock_edit()
|
|
|
- job = _JobManagerFactory._build_manager()._create(
|
|
|
- task, itertools.chain([cls._on_status_change], callbacks or []), submit_id, submit_entity_id, force=force
|
|
|
+ return _JobManagerFactory._build_manager()._create(
|
|
|
+ task,
|
|
|
+ itertools.chain([cls._on_status_change], callbacks or []),
|
|
|
+ submit_id,
|
|
|
+ submit_entity_id,
|
|
|
+ force=force
|
|
|
)
|
|
|
|
|
|
- return job
|
|
|
|
|
|
@classmethod
|
|
|
def _update_submission_status(cls, job: Job):
|
|
@@ -197,7 +199,7 @@ class _Orchestrator(_AbstractOrchestrator):
|
|
|
while __check_if_timeout(start, timeout) and index < len(jobs):
|
|
|
try:
|
|
|
if jobs[index]._is_finished():
|
|
|
- index = index + 1
|
|
|
+ index += 1
|
|
|
else:
|
|
|
sleep(0.5) # Limit CPU usage
|
|
|
except Exception:
|
|
@@ -308,7 +310,6 @@ class _Orchestrator(_AbstractOrchestrator):
|
|
|
|
|
|
@classmethod
|
|
|
def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
|
|
|
-
|
|
|
for job in jobs:
|
|
|
if job.is_running():
|
|
|
cls.__logger.info(f"{job.id} is running and cannot be canceled.")
|
|
@@ -316,11 +317,10 @@ class _Orchestrator(_AbstractOrchestrator):
|
|
|
cls.__logger.info(f"{job.id} has already been completed and cannot be canceled.")
|
|
|
elif job.is_skipped():
|
|
|
cls.__logger.info(f"{job.id} has already been skipped and cannot be canceled.")
|
|
|
+ elif job_id_to_cancel == job.id:
|
|
|
+ job.canceled()
|
|
|
else:
|
|
|
- if job_id_to_cancel == job.id:
|
|
|
- job.canceled()
|
|
|
- else:
|
|
|
- job.abandoned()
|
|
|
+ job.abandoned()
|
|
|
|
|
|
@staticmethod
|
|
|
def _check_and_execute_jobs_if_development_mode():
|