test_job_manager.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import multiprocessing
  12. import random
  13. import string
  14. from functools import partial
  15. from time import sleep
  16. from typing import cast
  17. from unittest import mock
  18. import pytest
  19. from taipy.common.config import Config
  20. from taipy.common.config.common.scope import Scope
  21. from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
  22. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  23. from taipy.core.config.job_config import JobConfig
  24. from taipy.core.data._data_manager import _DataManager
  25. from taipy.core.data._data_manager_factory import _DataManagerFactory
  26. from taipy.core.data.in_memory import InMemoryDataNode
  27. from taipy.core.exceptions.exceptions import JobNotDeletedException
  28. from taipy.core.job._job_manager import _JobManager
  29. from taipy.core.job.job_id import JobId
  30. from taipy.core.job.status import Status
  31. from taipy.core.scenario.scenario import Scenario
  32. from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
  33. from taipy.core.task._task_manager import _TaskManager
  34. from taipy.core.task.task import Task
  35. from tests.core.utils import assert_true_after_time
  36. def multiply(nb1: float, nb2: float):
  37. return nb1 * nb2
  38. def lock_multiply(lock, nb1: float, nb2: float):
  39. with lock:
  40. return multiply(1 or nb1, 2 or nb2)
  41. def test_create_jobs():
  42. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  43. task = _create_task(multiply, name="get_job")
  44. job_1 = _JobManager._create(task, [print], "submit_id", "secnario_id", True)
  45. assert _JobManager._get(job_1.id) == job_1
  46. assert job_1.is_submitted()
  47. assert task.config_id in job_1.id
  48. assert job_1.task.id == task.id
  49. assert job_1.submit_id == "submit_id"
  50. assert job_1.submit_entity_id == "secnario_id"
  51. assert job_1.force
  52. assert _JobManager._is_editable(job_1)
  53. job_2 = _JobManager._create(task, [print], "submit_id_1", "secnario_id", False)
  54. assert _JobManager._get(job_2.id) == job_2
  55. assert job_2.is_submitted()
  56. assert task.config_id in job_2.id
  57. assert job_2.task.id == task.id
  58. assert job_2.submit_id == "submit_id_1"
  59. assert job_2.submit_entity_id == "secnario_id"
  60. assert not job_2.force
  61. assert _JobManager._is_editable(job_2)
  62. def test_get_job():
  63. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  64. task = _create_task(multiply, name="get_job")
  65. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  66. assert _JobManager._get(job_1.id) == job_1
  67. assert _JobManager._get(job_1.id).submit_entity_id == task.id
  68. job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  69. assert job_1 != job_2
  70. assert _JobManager._get(job_1.id).id == job_1.id
  71. assert _JobManager._get(job_2.id).id == job_2.id
  72. assert _JobManager._get(job_2.id).submit_entity_id == task.id
  73. def test_get_latest_job():
  74. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  75. task = _create_task(multiply, name="get_latest_job")
  76. task_2 = _create_task(multiply, name="get_latest_job_2")
  77. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  78. assert _JobManager._get_latest(task) == job_1
  79. assert _JobManager._get_latest(task_2) is None
  80. sleep(0.01) # Comparison is based on time, precision on Windows is not enough important
  81. job_2 = _OrchestratorFactory._orchestrator.submit_task(task_2).jobs[0]
  82. assert _JobManager._get_latest(task).id == job_1.id
  83. assert _JobManager._get_latest(task_2).id == job_2.id
  84. sleep(0.01) # Comparison is based on time, precision on Windows is not enough important
  85. job_1_bis = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  86. assert _JobManager._get_latest(task).id == job_1_bis.id
  87. assert _JobManager._get_latest(task_2).id == job_2.id
  88. def test_get_job_unknown():
  89. assert _JobManager._get(JobId("Unknown")) is None
  90. def test_get_jobs():
  91. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  92. task = _create_task(multiply, name="get_all_jobs")
  93. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  94. job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  95. assert {job.id for job in _JobManager._get_all()} == {job_1.id, job_2.id}
  96. def test_delete_job():
  97. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  98. task = _create_task(multiply, name="delete_job")
  99. job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  100. job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  101. _JobManager._delete(job_1)
  102. assert [job.id for job in _JobManager._get_all()] == [job_2.id]
  103. assert _JobManager._get(job_1.id) is None
  104. def test_raise_when_trying_to_delete_unfinished_job():
  105. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  106. m = multiprocessing.Manager()
  107. lock = m.Lock()
  108. dnm = _DataManagerFactory._build_manager()
  109. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  110. dnm._set(dn_1)
  111. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  112. dnm._set(dn_2)
  113. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
  114. dnm._set(dn_3)
  115. task = Task(
  116. "task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="raise_when_delete_unfinished"
  117. )
  118. dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher())
  119. with lock:
  120. job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
  121. assert_true_after_time(lambda: dispatcher._nb_available_workers == 1)
  122. assert_true_after_time(job.is_running)
  123. with pytest.raises(JobNotDeletedException):
  124. _JobManager._delete(job)
  125. with pytest.raises(JobNotDeletedException):
  126. _JobManager._delete(job, force=False)
  127. assert_true_after_time(job.is_completed)
  128. _JobManager._delete(job)
  129. def test_force_deleting_unfinished_job():
  130. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  131. m = multiprocessing.Manager()
  132. lock = m.Lock()
  133. dnm = _DataManagerFactory._build_manager()
  134. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  135. dnm._set(dn_1)
  136. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  137. dnm._set(dn_2)
  138. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
  139. dnm._set(dn_3)
  140. task = Task(
  141. "task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="force_deleting_unfinished_job"
  142. )
  143. _OrchestratorFactory._build_dispatcher()
  144. with lock:
  145. job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
  146. assert_true_after_time(job.is_running)
  147. with pytest.raises(JobNotDeletedException):
  148. _JobManager._delete(job, force=False)
  149. _JobManager._delete(job, force=True)
  150. assert _JobManager._get(job.id) is None
  151. def test_cancel_single_job():
  152. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  153. task = _create_task(multiply, name="cancel_single_job")
  154. dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher())
  155. assert_true_after_time(dispatcher.is_running)
  156. dispatcher.stop()
  157. assert_true_after_time(lambda: not dispatcher.is_running())
  158. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  159. assert_true_after_time(job.is_pending)
  160. assert_true_after_time(lambda: dispatcher._nb_available_workers == 1)
  161. _JobManager._cancel(job.id)
  162. assert_true_after_time(job.is_canceled)
  163. @mock.patch(
  164. "taipy.core._orchestrator._orchestrator._Orchestrator._orchestrate_job_to_run_or_block",
  165. return_value="orchestrated_job",
  166. )
  167. @mock.patch("taipy.core._orchestrator._orchestrator._Orchestrator._cancel_jobs")
  168. def test_cancel_canceled_abandoned_failed_jobs(cancel_jobs, orchestrated_job):
  169. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  170. task = _create_task(multiply, name="test_cancel_canceled_abandoned_failed_jobs")
  171. dispatcher = _OrchestratorFactory._build_dispatcher()
  172. assert_true_after_time(dispatcher.is_running)
  173. dispatcher.stop()
  174. assert_true_after_time(lambda: not dispatcher.is_running())
  175. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  176. job.canceled()
  177. assert job.is_canceled()
  178. _JobManager._cancel(job)
  179. cancel_jobs.assert_not_called()
  180. assert job.is_canceled()
  181. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  182. job.failed()
  183. assert job.is_failed()
  184. _JobManager._cancel(job)
  185. cancel_jobs.assert_not_called()
  186. assert job.is_failed()
  187. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  188. job.abandoned()
  189. assert job.is_abandoned()
  190. _JobManager._cancel(job)
  191. cancel_jobs.assert_not_called()
  192. assert job.is_abandoned()
  193. @mock.patch(
  194. "taipy.core._orchestrator._orchestrator._Orchestrator._orchestrate_job_to_run_or_block",
  195. return_value="orchestrated_job",
  196. )
  197. @mock.patch("taipy.core.job.job.Job.canceled")
  198. def test_cancel_completed_skipped_jobs(cancel_jobs, orchestrated_job):
  199. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  200. task = _create_task(multiply, name="cancel_single_job")
  201. dispatcher = _OrchestratorFactory._build_dispatcher()
  202. assert_true_after_time(dispatcher.is_running)
  203. dispatcher.stop()
  204. assert_true_after_time(lambda: not dispatcher.is_running())
  205. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  206. job.completed()
  207. assert job.is_completed()
  208. cancel_jobs.assert_not_called()
  209. _JobManager._cancel(job)
  210. assert job.is_completed()
  211. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  212. job.failed()
  213. assert job.is_failed()
  214. cancel_jobs.assert_not_called()
  215. _JobManager._cancel(job)
  216. assert job.is_failed()
  217. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  218. job.skipped()
  219. assert job.is_skipped()
  220. cancel_jobs.assert_not_called()
  221. _JobManager._cancel(job)
  222. assert job.is_skipped()
  223. def test_cancel_single_running_job():
  224. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
  225. m = multiprocessing.Manager()
  226. lock = m.Lock()
  227. dnm = _DataManagerFactory._build_manager()
  228. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  229. dnm._set(dn_1)
  230. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  231. dnm._set(dn_2)
  232. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
  233. dnm._set(dn_3)
  234. task = Task("task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="cancel_single_job")
  235. dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
  236. assert_true_after_time(dispatcher.is_running)
  237. assert_true_after_time(lambda: dispatcher._nb_available_workers == 2)
  238. with lock:
  239. job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
  240. assert_true_after_time(job.is_running)
  241. assert dispatcher._nb_available_workers == 1
  242. _JobManager._cancel(job)
  243. assert_true_after_time(job.is_running)
  244. assert_true_after_time(job.is_completed)
  245. assert dispatcher._nb_available_workers == 2
  246. def test_cancel_subsequent_jobs():
  247. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
  248. _OrchestratorFactory._build_dispatcher()
  249. orchestrator = _OrchestratorFactory._orchestrator
  250. submission_manager = _SubmissionManagerFactory._build_manager()
  251. m = multiprocessing.Manager()
  252. lock_0 = m.Lock()
  253. dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
  254. dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
  255. dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO, properties={"default_data": 3})
  256. dn_4 = InMemoryDataNode("dn_config_4", Scope.SCENARIO, properties={"default_data": 4})
  257. task_1 = Task("task_config_1", {}, partial(lock_multiply, lock_0), [dn_1, dn_2], [dn_3], id="task_1")
  258. task_2 = Task("task_config_2", {}, multiply, [dn_1, dn_3], [dn_4], id="task_2")
  259. task_3 = Task("task_config_3", {}, print, [dn_4], id="task_3")
  260. # Can't get tasks under 1 scenario due to partial not serializable
  261. submission_1 = submission_manager._create("scenario_id", Scenario._ID_PREFIX, "scenario_config_id")
  262. submission_2 = submission_manager._create("scenario_id", Scenario._ID_PREFIX, "scenario_config_id")
  263. _DataManager._set(dn_1)
  264. _DataManager._set(dn_2)
  265. _DataManager._set(dn_3)
  266. _DataManager._set(dn_4)
  267. with lock_0:
  268. job_1 = orchestrator._lock_dn_output_and_create_job(
  269. task_1, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
  270. )
  271. orchestrator._orchestrate_job_to_run_or_block([job_1])
  272. job_2 = orchestrator._lock_dn_output_and_create_job(
  273. task_2, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
  274. )
  275. orchestrator._orchestrate_job_to_run_or_block([job_2])
  276. job_3 = orchestrator._lock_dn_output_and_create_job(
  277. task_3, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
  278. )
  279. orchestrator._orchestrate_job_to_run_or_block([job_3])
  280. submission_1.jobs = [job_1, job_2, job_3]
  281. assert_true_after_time(lambda: _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0)
  282. assert_true_after_time(lambda: len(_OrchestratorFactory._orchestrator.blocked_jobs) == 2)
  283. assert_true_after_time(job_1.is_running)
  284. assert_true_after_time(job_2.is_blocked)
  285. assert_true_after_time(job_3.is_blocked)
  286. job_4 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
  287. task_1, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
  288. )
  289. orchestrator._orchestrate_job_to_run_or_block([job_4])
  290. job_5 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
  291. task_2, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
  292. )
  293. orchestrator._orchestrate_job_to_run_or_block([job_5])
  294. job_6 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
  295. task_3, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
  296. )
  297. orchestrator._orchestrate_job_to_run_or_block([job_6])
  298. submission_2.jobs = [job_4, job_5, job_6]
  299. assert_true_after_time(job_4.is_pending)
  300. assert_true_after_time(job_5.is_blocked)
  301. assert_true_after_time(job_6.is_blocked)
  302. assert _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 1
  303. assert len(_OrchestratorFactory._orchestrator.blocked_jobs) == 4
  304. _JobManager._cancel(job_4)
  305. assert_true_after_time(job_4.is_canceled)
  306. assert_true_after_time(job_5.is_abandoned)
  307. assert_true_after_time(job_6.is_abandoned)
  308. assert _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0
  309. assert len(_OrchestratorFactory._orchestrator.blocked_jobs) == 2
  310. _JobManager._cancel(job_1)
  311. assert_true_after_time(job_1.is_running)
  312. assert_true_after_time(job_2.is_abandoned)
  313. assert_true_after_time(job_3.is_abandoned)
  314. assert_true_after_time(job_1.is_completed)
  315. assert_true_after_time(job_2.is_abandoned)
  316. assert_true_after_time(job_3.is_abandoned)
  317. assert_true_after_time(job_4.is_canceled)
  318. assert_true_after_time(job_5.is_abandoned)
  319. assert_true_after_time(job_6.is_abandoned)
  320. assert_true_after_time(
  321. lambda: all(
  322. not _OrchestratorFactory._orchestrator._is_blocked(job)
  323. for job in [job_1, job_2, job_3, job_4, job_5, job_6]
  324. )
  325. )
  326. assert_true_after_time(lambda: _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0)
  327. def test_is_deletable():
  328. assert len(_JobManager._get_all()) == 0
  329. task = _create_task(print, 0, "task")
  330. job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
  331. rc = _JobManager._is_deletable("some_job")
  332. assert not rc
  333. assert "Entity some_job does not exist in the repository." in rc.reasons
  334. assert job.is_completed()
  335. assert _JobManager._is_deletable(job)
  336. assert _JobManager._is_deletable(job.id)
  337. job.abandoned()
  338. assert job.is_abandoned()
  339. assert _JobManager._is_deletable(job)
  340. assert _JobManager._is_deletable(job.id)
  341. job.canceled()
  342. assert job.is_canceled()
  343. assert _JobManager._is_deletable(job)
  344. assert _JobManager._is_deletable(job.id)
  345. job.failed()
  346. assert job.is_failed()
  347. assert _JobManager._is_deletable(job)
  348. assert _JobManager._is_deletable(job.id)
  349. job.skipped()
  350. assert job.is_skipped()
  351. assert _JobManager._is_deletable(job)
  352. assert _JobManager._is_deletable(job.id)
  353. job.blocked()
  354. assert job.is_blocked()
  355. assert not _JobManager._is_deletable(job)
  356. assert not _JobManager._is_deletable(job.id)
  357. job.running()
  358. assert job.is_running()
  359. assert not _JobManager._is_deletable(job)
  360. assert not _JobManager._is_deletable(job.id)
  361. job.pending()
  362. assert job.is_pending()
  363. assert not _JobManager._is_deletable(job)
  364. assert not _JobManager._is_deletable(job.id)
  365. job.status = Status.SUBMITTED
  366. assert job.is_submitted()
  367. assert not _JobManager._is_deletable(job)
  368. assert not _JobManager._is_deletable(job.id)
  369. def _create_task(function, nb_outputs=1, name=None):
  370. input1_dn_config = Config.configure_data_node("input1", "pickle", Scope.SCENARIO, default_data=21)
  371. input2_dn_config = Config.configure_data_node("input2", "pickle", Scope.SCENARIO, default_data=2)
  372. output_dn_configs = [
  373. Config.configure_data_node(f"output{i}", "pickle", Scope.SCENARIO, default_data=0) for i in range(nb_outputs)
  374. ]
  375. _DataManager._bulk_get_or_create(output_dn_configs)
  376. name = name or "".join(random.choice(string.ascii_lowercase) for _ in range(10))
  377. task_config = Config.configure_task(
  378. id=name,
  379. function=function,
  380. input=[input1_dn_config, input2_dn_config],
  381. output=output_dn_configs,
  382. )
  383. return _TaskManager._bulk_get_or_create([task_config])[0]