123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- import shutil
- import uuid
- from datetime import datetime, timedelta
- from queue import Queue
- import pandas as pd
- import pytest
- from dotenv import load_dotenv
- from taipy.common.config import Config
- from taipy.common.config.checker._checker import _Checker
- from taipy.core import Cycle, DataNodeId, Job, JobId, Scenario, Sequence, Task
- from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
- from taipy.core.common.frequency import Frequency
- from taipy.core.common.scope import Scope
- from taipy.core.cycle._cycle_manager import _CycleManager
- from taipy.core.data._data_manager_factory import _DataManagerFactory
- from taipy.core.data.pickle import PickleDataNode
- from taipy.core.job._job_manager import _JobManager
- from taipy.core.task._task_manager import _TaskManager
- from taipy.rest.app import create_app
- from taipy.rest.config import _RestConfigChecker
- from .setup.shared.algorithms import evaluate, forecast
- @pytest.fixture
- def setup_end_to_end():
- model_cfg = Config.configure_data_node("model", path="setup/my_model.p", storage_type="pickle")
- day_cfg = Config.configure_data_node(id="day")
- forecasts_cfg = Config.configure_data_node(id="forecasts")
- forecast_task_cfg = Config.configure_task(
- id="forecast_task",
- input=[model_cfg, day_cfg],
- function=forecast,
- output=forecasts_cfg,
- )
- historical_temperature_cfg = Config.configure_data_node(
- "historical_temperature",
- storage_type="csv",
- path="setup/historical_temperature.csv",
- has_header=True,
- )
- evaluation_cfg = Config.configure_data_node("evaluation")
- evaluate_task_cfg = Config.configure_task(
- "evaluate_task",
- input=[historical_temperature_cfg, forecasts_cfg, day_cfg],
- function=evaluate,
- output=evaluation_cfg,
- )
- scenario_config = Config.configure_scenario(
- "scenario", [forecast_task_cfg, evaluate_task_cfg], frequency=Frequency.DAILY
- )
- scenario_config.add_sequences({"sequence": [forecast_task_cfg, evaluate_task_cfg]})
- @pytest.fixture()
- def app():
- load_dotenv(".testenv")
- app = create_app(testing=True)
- app.config.update(
- {
- "TESTING": True,
- }
- )
- with app.app_context(), app.test_request_context():
- yield app
- @pytest.fixture()
- def client(app):
- return app.test_client()
- @pytest.fixture
- def datanode_data():
- return {
- "name": "foo",
- "storage_type": "in_memory",
- "scope": "scenario",
- "default_data": ["1991-01-01T00:00:00"],
- }
- @pytest.fixture
- def task_data():
- return {
- "config_id": "foo",
- "input_ids": ["DATASOURCE_foo_3b888e17-1974-4a56-a42c-c7c96bc9cd54"],
- "function_name": "print",
- "function_module": "builtins",
- "output_ids": ["DATASOURCE_foo_4d9923b8-eb9f-4f3c-8055-3a1ce8bee309"],
- }
- @pytest.fixture
- def sequence_data():
- return {
- "name": "foo",
- "task_ids": ["TASK_foo_3b888e17-1974-4a56-a42c-c7c96bc9cd54"],
- }
- @pytest.fixture
- def scenario_data():
- return {
- "name": "foo",
- "sequence_ids": ["SEQUENCE_foo_3b888e17-1974-4a56-a42c-c7c96bc9cd54"],
- "properties": {},
- }
- @pytest.fixture
- def default_datanode():
- return PickleDataNode(
- "input_ds",
- Scope.SCENARIO,
- DataNodeId("f"),
- "owner_id",
- None,
- properties={"default_data": [1, 2, 3, 4, 5, 6]},
- )
- @pytest.fixture
- def default_df_datanode():
- return PickleDataNode(
- "input_ds",
- Scope.SCENARIO,
- DataNodeId("id_uio2"),
- "owner_id",
- None,
- properties={"default_data": pd.DataFrame([{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}])},
- )
- @pytest.fixture
- def default_datanode_config():
- return Config.configure_data_node(f"taipy_{uuid.uuid4().hex}", "in_memory", Scope.SCENARIO)
- @pytest.fixture
- def default_datanode_config_list():
- configs = []
- for i in range(10):
- configs.append(Config.configure_data_node(id=f"ds_{i}", storage_type="in_memory", scope=Scope.SCENARIO))
- return configs
- def __default_task():
- input_ds = PickleDataNode(
- "input_ds",
- Scope.SCENARIO,
- DataNodeId("id_uio"),
- "owner_id",
- {"TASK_task_id"},
- properties={"default_data": "In memory Data Source"},
- )
- _DataManagerFactory._build_manager()._repository._save(input_ds)
- output_ds = PickleDataNode(
- "output_ds",
- Scope.SCENARIO,
- DataNodeId("id_uio"),
- "owner_id",
- {"TASK_task_id"},
- properties={"default_data": "In memory Data Source"},
- )
- _DataManagerFactory._build_manager()._repository._save(output_ds)
- return Task(
- config_id="foo",
- properties={},
- function=print,
- input=[input_ds],
- output=[output_ds],
- id="TASK_task_id",
- )
- @pytest.fixture
- def default_task():
- return __default_task()
- @pytest.fixture
- def default_task_config():
- return Config.configure_task("task1", print, [], [])
- @pytest.fixture
- def default_task_config_list():
- configs = []
- for i in range(10):
- configs.append(Config.configure_task(f"task_{i}", print, [], []))
- return configs
- def __default_sequence():
- return Sequence(properties={"name": "foo"}, tasks=[__default_task()], sequence_id="SEQUENCE_foo_SCENARIO_acb")
- def __task_config():
- return Config.configure_task("task1", print, [], [])
- @pytest.fixture
- def default_sequence():
- return __default_sequence()
- @pytest.fixture
- def default_scenario_config():
- task_config = __task_config()
- scenario_config = Config.configure_scenario(
- f"taipy_{uuid.uuid4().hex}",
- [task_config],
- )
- scenario_config.add_sequences({"sequence": [task_config]})
- return scenario_config
- @pytest.fixture
- def default_scenario_config_list():
- configs = []
- for _ in range(10):
- task_config = Config.configure_task(f"taipy_{uuid.uuid4().hex}", print)
- scenario_config = Config.configure_scenario(
- f"taipy_{uuid.uuid4().hex}",
- [task_config],
- )
- scenario_config.add_sequences({"sequence": [task_config]})
- configs.append(scenario_config)
- return configs
- @pytest.fixture
- def default_scenario():
- return Scenario(config_id="foo", properties={}, tasks=[__default_task()], scenario_id="SCENARIO_scenario_id")
- def __create_cycle(name="foo"):
- now = datetime.now()
- return Cycle(
- name=name,
- frequency=Frequency.DAILY,
- properties={},
- creation_date=now,
- start_date=now,
- end_date=now + timedelta(days=5),
- )
- @pytest.fixture
- def create_cycle_list():
- cycles = []
- manager = _CycleManager
- for i in range(10):
- c = __create_cycle(f"cycle_{i}")
- manager._repository._save(c)
- return cycles
- @pytest.fixture
- def cycle_data():
- return {
- "name": "foo",
- "frequency": "daily",
- "properties": {},
- "creation_date": "2022-02-03T22:17:27.317114",
- "start_date": "2022-02-03T22:17:27.317114",
- "end_date": "2022-02-08T22:17:27.317114",
- }
- @pytest.fixture
- def default_cycle():
- return __create_cycle()
- def __create_job():
- task_manager = _TaskManager
- task = __default_task()
- task_manager._repository._save(task)
- submit_id = f"SUBMISSION_{str(uuid.uuid4())}"
- return Job(id=JobId(f"JOB_{uuid.uuid4()}"), task=task, submit_id=submit_id, submit_entity_id=task.id)
- @pytest.fixture
- def default_job():
- return __create_job()
- @pytest.fixture
- def create_job_list():
- jobs = []
- manager = _JobManager
- for _ in range(10):
- c = __create_job()
- manager._repository._save(c)
- return jobs
- @pytest.fixture
- def init_orchestrator():
- def _init_orchestrator():
- _OrchestratorFactory._remove_dispatcher()
- if _OrchestratorFactory._orchestrator is None:
- _OrchestratorFactory._build_orchestrator()
- _OrchestratorFactory._build_dispatcher(force_restart=True)
- _OrchestratorFactory._orchestrator.jobs_to_run = Queue()
- _OrchestratorFactory._orchestrator.blocked_jobs = []
- return _init_orchestrator
- @pytest.fixture(scope="function", autouse=True)
- def cleanup_files(reset_configuration_singleton, inject_rest_sections, inject_core_sections):
- reset_configuration_singleton()
- inject_core_sections()
- inject_rest_sections()
- _Checker.add_checker(_RestConfigChecker)
- Config.configure_core(repository_type="filesystem")
- if os.path.exists(".data"):
- shutil.rmtree(".data", ignore_errors=True)
- if os.path.exists(".my_data"):
- shutil.rmtree(".my_data", ignore_errors=True)
- yield
- for path in [".data", ".my_data", "user_data", ".taipy"]:
- if os.path.exists(path):
- shutil.rmtree(path, ignore_errors=True)
|