test_job_manager.py 19 KB


  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import multiprocessing
  12. import random
  13. import string
  14. from functools import partial
  15. from time import sleep
  16. from unittest import mock
  17. import pytest
  18. from taipy.config.common.scope import Scope
  19. from taipy.config.config import Config
  20. from taipy.core._orchestrator._dispatcher._job_dispatcher import _JobDispatcher
  21. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  22. from taipy.core.config.job_config import JobConfig
  23. from taipy.core.data._data_manager import _DataManager
  24. from taipy.core.data._data_manager_factory import _DataManagerFactory
  25. from taipy.core.data.in_memory import InMemoryDataNode
  26. from taipy.core.exceptions.exceptions import JobNotDeletedException
  27. from taipy.core.job._job_manager import _JobManager
  28. from taipy.core.job.job_id import JobId
  29. from taipy.core.job.status import Status
  30. from taipy.core.scenario.scenario import Scenario
  31. from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
  32. from taipy.core.task._task_manager import _TaskManager
  33. from taipy.core.task.task import Task
  34. from tests.core.utils import assert_true_after_time
  35. def multiply(nb1: float, nb2: float):
  36. return nb1 * nb2
  37. def lock_multiply(lock, nb1: float, nb2: float):
  38. with lock:
  39. return multiply(1 or nb1, 2 or nb2)
  40. def test_create_jobs():
  41. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  42. task = _create_task(multiply, name="get_job")
  43. job_1 = _JobManager._create(task, [print], "submit_id", "secnario_id", True)
  44. assert _JobManager._get(job_1.id) == job_1
  45. assert job_1.is_submitted()
  46. assert task.config_id in job_1.id
  47. assert job_1.task.id == task.id
  48. assert job_1.submit_id == "submit_id"
  49. assert job_1.submit_entity_id == "secnario_id"
  50. assert job_1.force
  51. assert _JobManager._is_editable(job_1)
  52. job_2 = _JobManager._create(task, [print], "submit_id_1", "secnario_id", False)
  53. assert _JobManager._get(job_2.id) == job_2
  54. assert job_2.is_submitted()
  55. assert task.config_id in job_2.id
  56. assert job_2.task.id == task.id
  57. assert job_2.submit_id == "submit_id_1"
  58. assert job_2.submit_entity_id == "secnario_id"
  59. assert not job_2.force
  60. assert _JobManager._is_editable(job_2)
  61. def test_get_job():
  62. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  63. task = _create_task(multiply, name="get_job")
  64. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  65. assert _JobManager._get(job_1.id) == job_1
  66. assert _JobManager._get(job_1.id).submit_entity_id == task.id
  67. job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  68. assert job_1 != job_2
  69. assert _JobManager._get(job_1.id).id == job_1.id
  70. assert _JobManager._get(job_2.id).id == job_2.id
  71. assert _JobManager._get(job_2.id).submit_entity_id == task.id
  72. def test_get_latest_job():
  73. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  74. task = _create_task(multiply, name="get_latest_job")
  75. task_2 = _create_task(multiply, name="get_latest_job_2")
  76. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  77. assert _JobManager._get_latest(task) == job_1
  78. assert _JobManager._get_latest(task_2) is None
  79. sleep(0.01) # Comparison is based on time, precision on Windows is not enough important
  80. job_2 = _OrchestratorFactory._orchestrator.submit_task(task_2).jobs[0]
  81. assert _JobManager._get_latest(task).id == job_1.id
  82. assert _JobManager._get_latest(task_2).id == job_2.id
  83. sleep(0.01) # Comparison is based on time, precision on Windows is not enough important
  84. job_1_bis = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  85. assert _JobManager._get_latest(task).id == job_1_bis.id
  86. assert _JobManager._get_latest(task_2).id == job_2.id
  87. def test_get_job_unknown():
  88. assert _JobManager._get(JobId("Unknown")) is None
  89. def test_get_jobs():
  90. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  91. task = _create_task(multiply, name="get_all_jobs")
  92. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  93. job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  94. assert {job.id for job in _JobManager._get_all()} == {job_1.id, job_2.id}
  95. def test_delete_job():
  96. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  97. task = _create_task(multiply, name="delete_job")
  98. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  99. job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  100. _JobManager._delete(job_1)
  101. assert [job.id for job in _JobManager._get_all()] == [job_2.id]
  102. assert _JobManager._get(job_1.id) is None
  103. m = multiprocessing.Manager()
  104. lock = m.Lock()
  105. def inner_lock_multiply(nb1: float, nb2: float):
  106. with lock:
  107. return multiply(1 or nb1, 2 or nb2)
  108. def test_raise_when_trying_to_delete_unfinished_job():
  109. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  110. m = multiprocessing.Manager()
  111. lock = m.Lock()
  112. dnm = _DataManagerFactory._build_manager()
  113. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  114. dnm._set(dn_1)
  115. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  116. dnm._set(dn_2)
  117. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
  118. dnm._set(dn_3)
  119. task = Task(
  120. "task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="raise_when_delete_unfinished"
  121. )
  122. _OrchestratorFactory._build_dispatcher()
  123. with lock:
  124. job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
  125. assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
  126. assert_true_after_time(job.is_running)
  127. with pytest.raises(JobNotDeletedException):
  128. _JobManager._delete(job)
  129. with pytest.raises(JobNotDeletedException):
  130. _JobManager._delete(job, force=False)
  131. assert_true_after_time(job.is_completed)
  132. _JobManager._delete(job)
  133. def test_force_deleting_unfinished_job():
  134. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  135. m = multiprocessing.Manager()
  136. lock = m.Lock()
  137. dnm = _DataManagerFactory._build_manager()
  138. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  139. dnm._set(dn_1)
  140. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  141. dnm._set(dn_2)
  142. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
  143. dnm._set(dn_3)
  144. task = Task(
  145. "task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="force_deleting_unfinished_job"
  146. )
  147. _OrchestratorFactory._build_dispatcher()
  148. with lock:
  149. job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
  150. assert_true_after_time(job.is_running)
  151. with pytest.raises(JobNotDeletedException):
  152. _JobManager._delete(job, force=False)
  153. _JobManager._delete(job, force=True)
  154. assert _JobManager._get(job.id) is None
  155. def test_cancel_single_job():
  156. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  157. task = _create_task(multiply, name="cancel_single_job")
  158. _OrchestratorFactory._build_dispatcher()
  159. assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
  160. _OrchestratorFactory._dispatcher.stop()
  161. assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
  162. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  163. assert_true_after_time(job.is_pending)
  164. assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 0)
  165. _JobManager._cancel(job.id)
  166. assert_true_after_time(job.is_canceled)
  167. assert_true_after_time(job.is_canceled)
  168. @mock.patch(
  169. "taipy.core._orchestrator._orchestrator._Orchestrator._orchestrate_job_to_run_or_block",
  170. return_value="orchestrated_job",
  171. )
  172. @mock.patch("taipy.core._orchestrator._orchestrator._Orchestrator._cancel_jobs")
  173. def test_cancel_canceled_abandoned_failed_jobs(cancel_jobs, orchestrated_job):
  174. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  175. task = _create_task(multiply, name="test_cancel_canceled_abandoned_failed_jobs")
  176. _OrchestratorFactory._build_dispatcher()
  177. assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
  178. _OrchestratorFactory._dispatcher.stop()
  179. assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
  180. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  181. job.canceled()
  182. assert job.is_canceled()
  183. _JobManager._cancel(job)
  184. cancel_jobs.assert_not_called()
  185. assert job.is_canceled()
  186. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  187. job.failed()
  188. assert job.is_failed()
  189. _JobManager._cancel(job)
  190. cancel_jobs.assert_not_called()
  191. assert job.is_failed()
  192. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  193. job.abandoned()
  194. assert job.is_abandoned()
  195. _JobManager._cancel(job)
  196. cancel_jobs.assert_not_called()
  197. assert job.is_abandoned()
  198. @mock.patch(
  199. "taipy.core._orchestrator._orchestrator._Orchestrator._orchestrate_job_to_run_or_block",
  200. return_value="orchestrated_job",
  201. )
  202. @mock.patch("taipy.core.job.job.Job.canceled")
  203. def test_cancel_completed_skipped_jobs(cancel_jobs, orchestrated_job):
  204. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  205. task = _create_task(multiply, name="cancel_single_job")
  206. _OrchestratorFactory._build_dispatcher()
  207. assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
  208. _OrchestratorFactory._dispatcher.stop()
  209. assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
  210. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  211. job.completed()
  212. assert job.is_completed()
  213. cancel_jobs.assert_not_called()
  214. _JobManager._cancel(job)
  215. assert job.is_completed()
  216. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  217. job.failed()
  218. assert job.is_failed()
  219. cancel_jobs.assert_not_called()
  220. _JobManager._cancel(job)
  221. assert job.is_failed()
  222. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  223. job.skipped()
  224. assert job.is_skipped()
  225. cancel_jobs.assert_not_called()
  226. _JobManager._cancel(job)
  227. assert job.is_skipped()
  228. def test_cancel_single_running_job():
  229. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  230. m = multiprocessing.Manager()
  231. lock = m.Lock()
  232. dnm = _DataManagerFactory._build_manager()
  233. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  234. dnm._set(dn_1)
  235. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  236. dnm._set(dn_2)
  237. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
  238. dnm._set(dn_3)
  239. task = Task("task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="cancel_single_job")
  240. _OrchestratorFactory._build_dispatcher()
  241. assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
  242. assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 2)
  243. with lock:
  244. job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
  245. assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
  246. assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 1)
  247. assert_true_after_time(job.is_running)
  248. _JobManager._cancel(job)
  249. assert_true_after_time(job.is_running)
  250. assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 0)
  251. assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 2)
  252. assert_true_after_time(job.is_completed)
  253. def test_cancel_subsequent_jobs():
  254. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  255. _OrchestratorFactory._build_dispatcher()
  256. orchestrator = _OrchestratorFactory._orchestrator
  257. submission_manager = _SubmissionManagerFactory._build_manager()
  258. lock_0 = m.Lock()
  259. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  260. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  261. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO, properties={"default_data": 3})
  262. dn_4 = InMemoryDataNode("dn_config_4", Scope.SCENARIO, properties={"default_data": 4})
  263. task_1 = Task("task_config_1", {}, partial(lock_multiply, lock_0), [dn_1, dn_2], [dn_3], id="task_1")
  264. task_2 = Task("task_config_2", {}, multiply, [dn_1, dn_3], [dn_4], id="task_2")
  265. task_3 = Task("task_config_3", {}, print, [dn_4], id="task_3")
  266. # Can't get tasks under 1 scenario due to partial not serializable
  267. submission_1 = submission_manager._create("scenario_id", Scenario._ID_PREFIX, "scenario_config_id")
  268. submission_2 = submission_manager._create("scenario_id", Scenario._ID_PREFIX, "scenario_config_id")
  269. _DataManager._set(dn_1)
  270. _DataManager._set(dn_2)
  271. _DataManager._set(dn_3)
  272. _DataManager._set(dn_4)
  273. with lock_0:
  274. job_1 = orchestrator._lock_dn_output_and_create_job(
  275. task_1, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
  276. )
  277. orchestrator._orchestrate_job_to_run_or_block([job_1])
  278. job_2 = orchestrator._lock_dn_output_and_create_job(
  279. task_2, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
  280. )
  281. orchestrator._orchestrate_job_to_run_or_block([job_2])
  282. job_3 = orchestrator._lock_dn_output_and_create_job(
  283. task_3, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
  284. )
  285. orchestrator._orchestrate_job_to_run_or_block([job_3])
  286. submission_1.jobs = [job_1, job_2, job_3]
  287. assert_true_after_time(lambda: _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0)
  288. assert_true_after_time(lambda: len(_OrchestratorFactory._orchestrator.blocked_jobs) == 2)
  289. assert_true_after_time(job_1.is_running)
  290. assert_true_after_time(job_2.is_blocked)
  291. assert_true_after_time(job_3.is_blocked)
  292. job_4 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
  293. task_1, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
  294. )
  295. orchestrator._orchestrate_job_to_run_or_block([job_4])
  296. job_5 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
  297. task_2, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
  298. )
  299. orchestrator._orchestrate_job_to_run_or_block([job_5])
  300. job_6 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
  301. task_3, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
  302. )
  303. orchestrator._orchestrate_job_to_run_or_block([job_6])
  304. submission_2.jobs = [job_4, job_5, job_6]
  305. assert_true_after_time(job_4.is_pending)
  306. assert_true_after_time(job_5.is_blocked)
  307. assert_true_after_time(job_6.is_blocked)
  308. assert _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 1
  309. assert len(_OrchestratorFactory._orchestrator.blocked_jobs) == 4
  310. _JobManager._cancel(job_4)
  311. assert_true_after_time(job_4.is_canceled)
  312. assert_true_after_time(job_5.is_abandoned)
  313. assert_true_after_time(job_6.is_abandoned)
  314. assert _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0
  315. assert len(_OrchestratorFactory._orchestrator.blocked_jobs) == 2
  316. _JobManager._cancel(job_1)
  317. assert_true_after_time(job_1.is_running)
  318. assert_true_after_time(job_2.is_abandoned)
  319. assert_true_after_time(job_3.is_abandoned)
  320. assert_true_after_time(job_1.is_completed)
  321. assert_true_after_time(job_2.is_abandoned)
  322. assert_true_after_time(job_3.is_abandoned)
  323. assert_true_after_time(job_4.is_canceled)
  324. assert_true_after_time(job_5.is_abandoned)
  325. assert_true_after_time(job_6.is_abandoned)
  326. assert_true_after_time(lambda: all(
  327. not _OrchestratorFactory._orchestrator._is_blocked(job)
  328. for job in [job_1, job_2, job_3, job_4, job_5, job_6]
  329. ))
  330. assert_true_after_time(lambda: _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0)
  331. def test_is_deletable():
  332. assert len(_JobManager._get_all()) == 0
  333. task = _create_task(print, 0, "task")
  334. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  335. assert job.is_completed()
  336. assert _JobManager._is_deletable(job)
  337. assert _JobManager._is_deletable(job.id)
  338. job.abandoned()
  339. assert job.is_abandoned()
  340. assert _JobManager._is_deletable(job)
  341. assert _JobManager._is_deletable(job.id)
  342. job.canceled()
  343. assert job.is_canceled()
  344. assert _JobManager._is_deletable(job)
  345. assert _JobManager._is_deletable(job.id)
  346. job.failed()
  347. assert job.is_failed()
  348. assert _JobManager._is_deletable(job)
  349. assert _JobManager._is_deletable(job.id)
  350. job.skipped()
  351. assert job.is_skipped()
  352. assert _JobManager._is_deletable(job)
  353. assert _JobManager._is_deletable(job.id)
  354. job.blocked()
  355. assert job.is_blocked()
  356. assert not _JobManager._is_deletable(job)
  357. assert not _JobManager._is_deletable(job.id)
  358. job.running()
  359. assert job.is_running()
  360. assert not _JobManager._is_deletable(job)
  361. assert not _JobManager._is_deletable(job.id)
  362. job.pending()
  363. assert job.is_pending()
  364. assert not _JobManager._is_deletable(job)
  365. assert not _JobManager._is_deletable(job.id)
  366. job.status = Status.SUBMITTED
  367. assert job.is_submitted()
  368. assert not _JobManager._is_deletable(job)
  369. assert not _JobManager._is_deletable(job.id)
  370. def _create_task(function, nb_outputs=1, name=None):
  371. input1_dn_config = Config.configure_data_node("input1", "pickle", Scope.SCENARIO, default_data=21)
  372. input2_dn_config = Config.configure_data_node("input2", "pickle", Scope.SCENARIO, default_data=2)
  373. output_dn_configs = [
  374. Config.configure_data_node(f"output{i}", "pickle", Scope.SCENARIO, default_data=0) for i in range(nb_outputs)
  375. ]
  376. _DataManager._bulk_get_or_create({cfg for cfg in output_dn_configs})
  377. name = name or "".join(random.choice(string.ascii_lowercase) for _ in range(10))
  378. task_config = Config.configure_task(
  379. id=name,
  380. function=function,
  381. input=[input1_dn_config, input2_dn_config],
  382. output=output_dn_configs,
  383. )
  384. return _TaskManager._bulk_get_or_create([task_config])[0]