test_job.py 15 KB


  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import datetime, timedelta
  12. from time import sleep
  13. from typing import Union, cast
  14. from unittest import mock
  15. from unittest.mock import MagicMock
  16. import freezegun
  17. import pytest
  18. from taipy import Scope
  19. from taipy.common.config import Config
  20. from taipy.core import JobId, TaskId
  21. from taipy.core._orchestrator._abstract_orchestrator import _AbstractOrchestrator
  22. from taipy.core._orchestrator._dispatcher._development_job_dispatcher import _DevelopmentJobDispatcher
  23. from taipy.core._orchestrator._dispatcher._standalone_job_dispatcher import _StandaloneJobDispatcher
  24. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  25. from taipy.core.config.job_config import JobConfig
  26. from taipy.core.data._data_manager_factory import _DataManagerFactory
  27. from taipy.core.data.in_memory import InMemoryDataNode
  28. from taipy.core.job._job_manager import _JobManager
  29. from taipy.core.job._job_manager_factory import _JobManagerFactory
  30. from taipy.core.job.job import Job
  31. from taipy.core.job.status import Status
  32. from taipy.core.scenario.scenario import Scenario
  33. from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
  34. from taipy.core.task._task_manager import _TaskManager
  35. from taipy.core.task._task_manager_factory import _TaskManagerFactory
  36. from taipy.core.task.task import Task
  37. @pytest.fixture
  38. def task_id():
  39. return TaskId("task_id1")
  40. @pytest.fixture
  41. def task(task_id):
  42. return Task(config_id="name", properties={}, function=print, input=[], output=[], id=task_id)
  43. @pytest.fixture
  44. def job_id():
  45. return JobId("id1")
  46. @pytest.fixture(scope="class")
  47. def scenario():
  48. return Scenario(
  49. "scenario_config",
  50. [],
  51. {},
  52. [],
  53. "SCENARIO_scenario_config",
  54. version="random_version_number",
  55. )
  56. @pytest.fixture
  57. def job(task, job_id):
  58. return Job(job_id, task, "submit_id", "SCENARIO_scenario_config")
  59. @pytest.fixture
  60. def replace_in_memory_write_fct():
  61. default_write = InMemoryDataNode.write
  62. default__write = InMemoryDataNode._write
  63. InMemoryDataNode.write = _error
  64. InMemoryDataNode._write = _error
  65. yield
  66. InMemoryDataNode.write = default_write
  67. InMemoryDataNode._write = default__write
  68. def _foo():
  69. return 42
  70. def _error():
  71. raise RuntimeError("Something bad has happened")
  72. def test_job_equals(job):
  73. _TaskManagerFactory._build_manager()._repository._save(job.task)
  74. job_manager = _JobManagerFactory()._build_manager()
  75. job_id = job.id
  76. job_manager._repository._save(job)
  77. # To test if instance is same type
  78. task = Task("task", {}, print, [], [], job_id)
  79. job_2 = job_manager._get(job_id)
  80. assert job == job_2
  81. assert job != job_id
  82. assert job != task
  83. def test_create_job(scenario, task, job):
  84. from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
  85. _ScenarioManagerFactory._build_manager()._repository._save(scenario)
  86. assert job.id == "id1"
  87. assert task in job
  88. assert job.is_submitted()
  89. assert job.submit_id is not None
  90. assert job.submit_entity_id == "SCENARIO_scenario_config"
  91. assert job.submit_entity == scenario
  92. with mock.patch("taipy.core.get") as get_mck:
  93. get_mck.return_value = task
  94. assert job.get_label() == "name > " + job.id
  95. assert job.get_simple_label() == job.id
  96. def test_comparison(task):
  97. job_id_1 = JobId("id1")
  98. job_id_2 = JobId("id2")
  99. job_1 = Job(job_id_1, task, "submit_id", "scenario_entity_id")
  100. sleep(0.01) # Comparison is based on time, precision on Windows is not enough important
  101. job_2 = Job(job_id_2, task, "submit_id", "scenario_entity_id")
  102. assert job_1 < job_2
  103. assert job_2 > job_1
  104. assert job_1 <= job_2
  105. assert job_1 <= job_1
  106. assert job_2 >= job_1
  107. assert job_1 >= job_1
  108. assert job_1 == job_1
  109. assert job_1 != job_2
  110. def test_status_job(task):
  111. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  112. job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
  113. _JobManagerFactory._build_manager()._repository._save(job)
  114. submission.jobs = [job]
  115. assert job.is_submitted()
  116. assert job.is_skipped() is False
  117. assert job.is_pending() is False
  118. assert job.is_blocked() is False
  119. assert job.is_canceled() is False
  120. assert job.is_failed() is False
  121. assert job.is_completed() is False
  122. assert job.is_running() is False
  123. job.canceled()
  124. assert job.is_canceled()
  125. job.failed()
  126. assert job.is_failed()
  127. job.running()
  128. assert job.is_running()
  129. job.completed()
  130. assert job.is_completed()
  131. job.pending()
  132. assert job.is_pending()
  133. job.blocked()
  134. assert job.is_blocked()
  135. job.skipped()
  136. assert job.is_skipped()
  137. def test_stacktrace_job(task):
  138. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  139. job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
  140. _JobManagerFactory._build_manager()._repository._save(job)
  141. fake_stacktraces = [
  142. """Traceback (most recent call last):
  143. File "<stdin>", line 1, in <module>
  144. ZeroDivisionError: division by zero""",
  145. "Another error",
  146. "yet\nAnother\nError",
  147. ]
  148. job.stacktrace = fake_stacktraces
  149. assert job.stacktrace == fake_stacktraces
  150. def test_notification_job(task):
  151. subscribe = MagicMock()
  152. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  153. job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
  154. _JobManagerFactory._build_manager()._repository._save(job)
  155. submission.jobs = [job]
  156. job._on_status_change(subscribe)
  157. job.running()
  158. subscribe.assert_called_once_with(job)
  159. subscribe.reset_mock()
  160. job.completed()
  161. subscribe.assert_called_once_with(job)
  162. subscribe.reset_mock()
  163. job.skipped()
  164. subscribe.assert_called_once_with(job)
  165. def test_handle_exception_in_user_function(task_id, job_id):
  166. task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id)
  167. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  168. job = Job(job_id, task, submission.id, "scenario_entity_id")
  169. submission.jobs = [job]
  170. _dispatch(task, job)
  171. job = _JobManager._get(job_id)
  172. assert job.is_failed()
  173. assert 'raise RuntimeError("Something bad has happened")' in str(job.stacktrace[0])
  174. def test_handle_exception_in_input_data_node(task_id, job_id):
  175. data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
  176. _DataManagerFactory._build_manager()._repository._save(data_node)
  177. task = Task(config_id="name", properties={}, input=[data_node], function=print, output=[], id=task_id)
  178. _TaskManagerFactory._build_manager()._repository._save(task)
  179. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  180. job = Job(job_id, task, submission.id, "scenario_entity_id")
  181. _JobManagerFactory._build_manager()._repository._save(job)
  182. submission.jobs = [job]
  183. _dispatch(task, job)
  184. job = _JobManager._get(job_id)
  185. assert job.is_failed()
  186. assert "taipy.core.exceptions.exceptions.NoData" in str(job.stacktrace[0])
  187. def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_id, job_id):
  188. data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
  189. _DataManagerFactory._build_manager()._repository._save(data_node)
  190. task = Task(config_id="name", properties={}, input=[], function=_foo, output=[data_node], id=task_id)
  191. _TaskManagerFactory._build_manager()._repository._save(task)
  192. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  193. job = Job(job_id, task, submission.id, "scenario_entity_id")
  194. _JobManagerFactory._build_manager()._repository._save(job)
  195. submission.jobs = [job]
  196. _dispatch(task, job)
  197. job = _JobManager._get(job_id)
  198. assert job.is_failed()
  199. assert "taipy.core.exceptions.exceptions.DataNodeWritingError" in str(job.stacktrace[0])
  200. def test_auto_update_and_reload(current_datetime, job_id):
  201. task_1 = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
  202. task_2 = Task(config_id="name_2", properties={}, function=_foo, id=TaskId("task_2"))
  203. submission = _SubmissionManagerFactory._build_manager()._create(task_1.id, task_1._ID_PREFIX, task_1.config_id)
  204. job_1 = Job(job_id, task_1, submission.id, "scenario_entity_id")
  205. submission.jobs = [job_1]
  206. _TaskManager._repository._save(task_1)
  207. _TaskManager._repository._save(task_2)
  208. _JobManager._repository._save(job_1)
  209. job_2 = _JobManager._get(job_1, "submit_id_2")
  210. # auto set & reload on task attribute
  211. assert job_1.task.id == task_1.id
  212. assert job_2.task.id == task_1.id
  213. job_1.task = task_2
  214. assert job_1.task.id == task_2.id
  215. assert job_2.task.id == task_2.id
  216. job_2.task = task_1
  217. assert job_1.task.id == task_1.id
  218. assert job_2.task.id == task_1.id
  219. # auto set & reload on force attribute
  220. assert not job_1.force
  221. assert not job_2.force
  222. job_1.force = True
  223. assert job_1.force
  224. assert job_2.force
  225. job_2.force = False
  226. assert not job_1.force
  227. assert not job_2.force
  228. # auto set & reload on status attribute
  229. assert job_1.status == Status.SUBMITTED
  230. assert job_2.status == Status.SUBMITTED
  231. job_1.status = Status.CANCELED
  232. assert job_1.status == Status.CANCELED
  233. assert job_2.status == Status.CANCELED
  234. job_2.status = Status.BLOCKED
  235. assert job_1.status == Status.BLOCKED
  236. assert job_2.status == Status.BLOCKED
  237. # auto set & reload on creation_date attribute
  238. new_datetime = current_datetime + timedelta(1)
  239. new_datetime_1 = current_datetime + timedelta(1)
  240. job_1.creation_date = new_datetime_1
  241. assert job_1.creation_date == new_datetime_1
  242. assert job_2.creation_date == new_datetime_1
  243. job_2.creation_date = new_datetime
  244. assert job_1.creation_date == new_datetime
  245. assert job_2.creation_date == new_datetime
  246. with job_1 as job:
  247. assert job.task.id == task_1.id
  248. assert not job.force
  249. assert job.status == Status.BLOCKED
  250. assert job.creation_date == new_datetime
  251. assert job._is_in_context
  252. new_datetime_2 = new_datetime + timedelta(1)
  253. job.task = task_2
  254. job.force = True
  255. job.status = Status.COMPLETED
  256. job.creation_date = new_datetime_2
  257. assert job.task.id == task_1.id
  258. assert not job.force
  259. assert job.status == Status.BLOCKED
  260. assert job.creation_date == new_datetime
  261. assert job._is_in_context
  262. assert job_1.task.id == task_2.id
  263. assert job_1.force
  264. assert job_1.status == Status.COMPLETED
  265. assert job_1.creation_date == new_datetime_2
  266. assert not job_1._is_in_context
  267. def test_status_records(job_id):
  268. task_1 = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
  269. submission = _SubmissionManagerFactory._build_manager()._create(task_1.id, task_1._ID_PREFIX, task_1.config_id)
  270. with freezegun.freeze_time("2024-09-25 13:30:30"):
  271. job_1 = Job(job_id, task_1, submission.id, "scenario_entity_id")
  272. submission.jobs = [job_1]
  273. _TaskManager._repository._save(task_1)
  274. _JobManager._repository._save(job_1)
  275. assert job_1._status_change_records == {"SUBMITTED": datetime(2024, 9, 25, 13, 30, 30)}
  276. assert job_1.submitted_at == datetime(2024, 9, 25, 13, 30, 30)
  277. assert job_1.execution_duration is None
  278. with freezegun.freeze_time("2024-09-25 13:35:30"):
  279. job_1.blocked()
  280. assert job_1._status_change_records == {
  281. "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
  282. "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
  283. }
  284. assert job_1.execution_duration is None
  285. with freezegun.freeze_time("2024-09-25 13:36:00"):
  286. assert job_1.blocked_duration == 30 # = 13:36:00 - 13:35:30
  287. with freezegun.freeze_time("2024-09-25 13:40:30"):
  288. job_1.pending()
  289. assert job_1._status_change_records == {
  290. "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
  291. "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
  292. "PENDING": datetime(2024, 9, 25, 13, 40, 30),
  293. }
  294. assert job_1.execution_duration is None
  295. with freezegun.freeze_time("2024-09-25 13:41:00"):
  296. assert job_1.pending_duration == 30 # = 13:41:00 - 13:40:30
  297. with freezegun.freeze_time("2024-09-25 13:50:30"):
  298. job_1.running()
  299. assert job_1._status_change_records == {
  300. "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
  301. "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
  302. "PENDING": datetime(2024, 9, 25, 13, 40, 30),
  303. "RUNNING": datetime(2024, 9, 25, 13, 50, 30),
  304. }
  305. assert job_1.run_at == datetime(2024, 9, 25, 13, 50, 30)
  306. assert job_1.blocked_duration == 300 # = 13:40:30 - 13:35:30
  307. assert job_1.pending_duration == 600 # = 13:50:30 - 13:40:30
  308. assert job_1.execution_duration > 0
  309. with freezegun.freeze_time("2024-09-25 13:56:35"):
  310. job_1.completed()
  311. assert job_1._status_change_records == {
  312. "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
  313. "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
  314. "PENDING": datetime(2024, 9, 25, 13, 40, 30),
  315. "RUNNING": datetime(2024, 9, 25, 13, 50, 30),
  316. "COMPLETED": datetime(2024, 9, 25, 13, 56, 35),
  317. }
  318. assert job_1.execution_duration == 365 # = 13:56:35 - 13:50:30
  319. def test_is_deletable():
  320. with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit:
  321. task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
  322. job = Job(job_id, task, "submit_id_1", "scenario_entity_id")
  323. job.is_deletable()
  324. mock_submit.assert_called_once_with(job)
  325. def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE):
  326. Config.configure_job_executions(mode=mode)
  327. _TaskManager._repository._save(task)
  328. _JobManager._repository._save(job)
  329. dispatcher: Union[_StandaloneJobDispatcher, _DevelopmentJobDispatcher] = _StandaloneJobDispatcher(
  330. cast(_AbstractOrchestrator, _OrchestratorFactory._orchestrator)
  331. )
  332. if mode == JobConfig._DEVELOPMENT_MODE:
  333. dispatcher = _DevelopmentJobDispatcher(cast(_AbstractOrchestrator, _OrchestratorFactory._orchestrator))
  334. dispatcher._dispatch(job)