|
- # Copyright 2023 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from datetime import datetime
- from functools import partial
- from typing import Union
- from unittest import mock
- from unittest.mock import patch
- import pytest
- from src.taipy.core import TaskId
- from src.taipy.core.job._job_manager_factory import _JobManagerFactory
- from src.taipy.core.job.job import Job
- from src.taipy.core.job.status import Status
- from src.taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
- from src.taipy.core.submission.submission import Submission
- from src.taipy.core.submission.submission_status import SubmissionStatus
- from src.taipy.core.task._task_manager_factory import _TaskManagerFactory
- from src.taipy.core.task.task import Task
- def test_create_submission(scenario, job, current_datetime):
- submission_1 = Submission(scenario.id, scenario._ID_PREFIX)
- assert submission_1.id is not None
- assert submission_1.entity_id == scenario.id
- assert submission_1.jobs == []
- assert isinstance(submission_1.creation_date, datetime)
- assert submission_1._submission_status == SubmissionStatus.SUBMITTED
- assert submission_1._version is not None
- submission_2 = Submission(
- scenario.id,
- scenario._ID_PREFIX,
- "submission_id",
- [job],
- current_datetime,
- SubmissionStatus.COMPLETED,
- "version_id",
- )
- assert submission_2.id == "submission_id"
- assert submission_2.entity_id == scenario.id
- assert submission_2._jobs == [job]
- assert submission_2.creation_date == current_datetime
- assert submission_2._submission_status == SubmissionStatus.COMPLETED
- assert submission_2._version == "version_id"
- class MockJob:
- def __init__(self, id: str, status):
- self.status = status
- self.id = id
- def is_failed(self):
- return self.status == Status.FAILED
- def is_canceled(self):
- return self.status == Status.CANCELED
- def is_blocked(self):
- return self.status == Status.BLOCKED
- def is_pending(self):
- return self.status == Status.PENDING
- def is_running(self):
- return self.status == Status.RUNNING
- def is_completed(self):
- return self.status == Status.COMPLETED
- def is_skipped(self):
- return self.status == Status.SKIPPED
- def is_abandoned(self):
- return self.status == Status.ABANDONED
- def is_submitted(self):
- return self.status == Status.SUBMITTED
- def mock_get_jobs(job_ids):
- jobs = {
- "job0_submitted": MockJob("job0_submitted", Status.SUBMITTED),
- "job1_failed": MockJob("job1_failed", Status.FAILED),
- "job2_canceled": MockJob("job2_canceled", Status.CANCELED),
- "job3_blocked": MockJob("job3_blocked", Status.BLOCKED),
- "job4_pending": MockJob("job4_pending", Status.PENDING),
- "job5_running": MockJob("job5_running", Status.RUNNING),
- "job6_completed": MockJob("job6_completed", Status.COMPLETED),
- "job7_skipped": MockJob("job7_skipped", Status.SKIPPED),
- "job8_abandoned": MockJob("job8_abandoned", Status.ABANDONED),
- }
- return [jobs[job_id] for job_id in job_ids]
- def __test_update_submission_status(job_ids, expected_submission_status):
- with (
- patch(
- "src.taipy.core.submission.submission.Submission.jobs",
- new_callable=mock.PropertyMock,
- return_value=(mock_get_jobs(job_ids)),
- )
- ):
- submission = Submission("submission_id", "ENTITY_TYPE")
- submission._update_submission_status(None)
- assert submission.submission_status == expected_submission_status
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job1_failed"], SubmissionStatus.FAILED),
- (["job2_canceled"], SubmissionStatus.CANCELED),
- (["job3_blocked"], SubmissionStatus.BLOCKED),
- (["job4_pending"], SubmissionStatus.PENDING),
- (["job5_running"], SubmissionStatus.RUNNING),
- (["job6_completed"], SubmissionStatus.COMPLETED),
- (["job7_skipped"], SubmissionStatus.COMPLETED),
- (["job8_abandoned"], SubmissionStatus.UNDEFINED),
- ],
- )
- def test_update_single_submission_status(job_ids, expected_submission_status):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job1_failed", "job1_failed"], SubmissionStatus.FAILED),
- (["job1_failed", "job2_canceled"], SubmissionStatus.FAILED),
- (["job1_failed", "job3_blocked"], SubmissionStatus.FAILED),
- (["job1_failed", "job4_pending"], SubmissionStatus.FAILED),
- (["job1_failed", "job5_running"], SubmissionStatus.FAILED),
- (["job1_failed", "job6_completed"], SubmissionStatus.FAILED),
- (["job1_failed", "job7_skipped"], SubmissionStatus.FAILED),
- (["job1_failed", "job8_abandoned"], SubmissionStatus.FAILED),
- (["job2_canceled", "job1_failed"], SubmissionStatus.FAILED),
- (["job3_blocked", "job1_failed"], SubmissionStatus.FAILED),
- (["job4_pending", "job1_failed"], SubmissionStatus.FAILED),
- (["job5_running", "job1_failed"], SubmissionStatus.FAILED),
- (["job6_completed", "job1_failed"], SubmissionStatus.FAILED),
- (["job7_skipped", "job1_failed"], SubmissionStatus.FAILED),
- (["job8_abandoned", "job1_failed"], SubmissionStatus.FAILED),
- ],
- )
- def test_update_submission_status_with_one_failed_job_in_jobs(job_ids, expected_submission_status):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job2_canceled", "job2_canceled"], SubmissionStatus.CANCELED),
- (["job2_canceled", "job3_blocked"], SubmissionStatus.CANCELED),
- (["job2_canceled", "job4_pending"], SubmissionStatus.CANCELED),
- (["job2_canceled", "job5_running"], SubmissionStatus.CANCELED),
- (["job2_canceled", "job6_completed"], SubmissionStatus.CANCELED),
- (["job2_canceled", "job7_skipped"], SubmissionStatus.CANCELED),
- (["job2_canceled", "job8_abandoned"], SubmissionStatus.CANCELED),
- (["job3_blocked", "job2_canceled"], SubmissionStatus.CANCELED),
- (["job4_pending", "job2_canceled"], SubmissionStatus.CANCELED),
- (["job5_running", "job2_canceled"], SubmissionStatus.CANCELED),
- (["job6_completed", "job2_canceled"], SubmissionStatus.CANCELED),
- (["job7_skipped", "job2_canceled"], SubmissionStatus.CANCELED),
- (["job8_abandoned", "job2_canceled"], SubmissionStatus.CANCELED),
- ],
- )
- def test_update_submission_status_with_one_canceled_job_in_jobs(job_ids, expected_submission_status):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job4_pending", "job3_blocked"], SubmissionStatus.PENDING),
- (["job4_pending", "job4_pending"], SubmissionStatus.PENDING),
- (["job4_pending", "job6_completed"], SubmissionStatus.PENDING),
- (["job4_pending", "job7_skipped"], SubmissionStatus.PENDING),
- (["job3_blocked", "job4_pending"], SubmissionStatus.PENDING),
- (["job6_completed", "job4_pending"], SubmissionStatus.PENDING),
- (["job7_skipped", "job4_pending"], SubmissionStatus.PENDING),
- ],
- )
- def test_update_submission_status_with_no_failed_or_cancel_one_pending_in_jobs(job_ids, expected_submission_status):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job5_running", "job3_blocked"], SubmissionStatus.RUNNING),
- (["job5_running", "job4_pending"], SubmissionStatus.RUNNING),
- (["job5_running", "job5_running"], SubmissionStatus.RUNNING),
- (["job5_running", "job6_completed"], SubmissionStatus.RUNNING),
- (["job5_running", "job7_skipped"], SubmissionStatus.RUNNING),
- (["job3_blocked", "job5_running"], SubmissionStatus.RUNNING),
- (["job4_pending", "job5_running"], SubmissionStatus.RUNNING),
- (["job6_completed", "job5_running"], SubmissionStatus.RUNNING),
- (["job7_skipped", "job5_running"], SubmissionStatus.RUNNING),
- ],
- )
- def test_update_submission_status_with_no_failed_cancel_nor_pending_one_running_in_jobs(
- job_ids, expected_submission_status
- ):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job3_blocked", "job3_blocked"], SubmissionStatus.BLOCKED),
- (["job3_blocked", "job6_completed"], SubmissionStatus.BLOCKED),
- (["job3_blocked", "job7_skipped"], SubmissionStatus.BLOCKED),
- (["job6_completed", "job3_blocked"], SubmissionStatus.BLOCKED),
- (["job7_skipped", "job3_blocked"], SubmissionStatus.BLOCKED),
- ],
- )
- def test_update_submission_status_with_no_failed_cancel_pending_nor_running_one_blocked_in_jobs(
- job_ids, expected_submission_status
- ):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job6_completed", "job6_completed"], SubmissionStatus.COMPLETED),
- (["job6_completed", "job7_skipped"], SubmissionStatus.COMPLETED),
- (["job7_skipped", "job6_completed"], SubmissionStatus.COMPLETED),
- (["job7_skipped", "job7_skipped"], SubmissionStatus.COMPLETED),
- ],
- )
- def test_update_submission_status_with_only_completed_or_skipped_in_jobs(job_ids, expected_submission_status):
- __test_update_submission_status(job_ids, expected_submission_status)
- @pytest.mark.parametrize(
- "job_ids, expected_submission_status",
- [
- (["job3_blocked", "job8_abandoned"], SubmissionStatus.UNDEFINED),
- (["job4_pending", "job8_abandoned"], SubmissionStatus.UNDEFINED),
- (["job5_running", "job8_abandoned"], SubmissionStatus.UNDEFINED),
- (["job6_completed", "job8_abandoned"], SubmissionStatus.UNDEFINED),
- (["job7_skipped", "job8_abandoned"], SubmissionStatus.UNDEFINED),
- (["job8_abandoned", "job8_abandoned"], SubmissionStatus.UNDEFINED),
- (["job8_abandoned", "job3_blocked"], SubmissionStatus.UNDEFINED),
- (["job8_abandoned", "job4_pending"], SubmissionStatus.UNDEFINED),
- (["job8_abandoned", "job5_running"], SubmissionStatus.UNDEFINED),
- (["job8_abandoned", "job6_completed"], SubmissionStatus.UNDEFINED),
- (["job8_abandoned", "job7_skipped"], SubmissionStatus.UNDEFINED),
- ],
- )
- def test_update_submission_status_with_wrong_case_abandoned_without_cancel_or_failed_in_jobs(
- job_ids, expected_submission_status
- ):
- __test_update_submission_status(job_ids, expected_submission_status)
- def test_auto_set_and_reload():
- task = Task(config_id="name_1", properties={}, function=print, id=TaskId("task_1"))
- submission_1 = Submission(task.id, task._ID_PREFIX)
- job_1 = Job("job_1", task, submission_1.id, submission_1.entity_id)
- job_2 = Job("job_2", task, submission_1.id, submission_1.entity_id)
- _TaskManagerFactory._build_manager()._set(task)
- _SubmissionManagerFactory._build_manager()._set(submission_1)
- _JobManagerFactory._build_manager()._set(job_1)
- _JobManagerFactory._build_manager()._set(job_2)
- submission_2 = _SubmissionManagerFactory._build_manager()._get(submission_1)
- assert submission_1.id == submission_2.id
- assert submission_1.entity_id == submission_2.entity_id
- assert submission_1.creation_date == submission_2.creation_date
- assert submission_1.submission_status == submission_2.submission_status
- # auto set & reload on jobs attribute
- assert submission_1.jobs == []
- assert submission_2.jobs == []
- submission_1.jobs = [job_1]
- assert submission_1.jobs == [job_1]
- assert submission_2.jobs == [job_1]
- submission_2.jobs = [job_2]
- assert submission_1.jobs == [job_2]
- assert submission_2.jobs == [job_2]
- submission_1.jobs = [job_1, job_2]
- assert submission_1.jobs == [job_1, job_2]
- assert submission_2.jobs == [job_1, job_2]
- submission_2.jobs = [job_2, job_1]
- assert submission_1.jobs == [job_2, job_1]
- assert submission_2.jobs == [job_2, job_1]
- # auto set & reload on submission_status attribute
- assert submission_1.submission_status == SubmissionStatus.SUBMITTED
- assert submission_2.submission_status == SubmissionStatus.SUBMITTED
- submission_1.submission_status = SubmissionStatus.BLOCKED
- assert submission_1.submission_status == SubmissionStatus.BLOCKED
- assert submission_2.submission_status == SubmissionStatus.BLOCKED
- submission_2.submission_status = SubmissionStatus.COMPLETED
- assert submission_1.submission_status == SubmissionStatus.COMPLETED
- assert submission_2.submission_status == SubmissionStatus.COMPLETED
- with submission_1 as submission:
- assert submission.jobs == [job_2, job_1]
- assert submission.submission_status == SubmissionStatus.COMPLETED
- submission.jobs = [job_1]
- submission.submission_status = SubmissionStatus.PENDING
- assert submission.jobs == [job_2, job_1]
- assert submission.submission_status == SubmissionStatus.COMPLETED
- assert submission_1.jobs == [job_1]
- assert submission_1.submission_status == SubmissionStatus.PENDING
- assert submission_2.jobs == [job_1]
- assert submission_2.submission_status == SubmissionStatus.PENDING
|