test_job.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import timedelta
  12. from time import sleep
  13. from typing import Union
  14. from unittest import mock
  15. from unittest.mock import MagicMock
  16. import pytest
  17. from taipy.config.common.scope import Scope
  18. from taipy.config.config import Config
  19. from taipy.core import JobId, TaskId
  20. from taipy.core._orchestrator._dispatcher._development_job_dispatcher import _DevelopmentJobDispatcher
  21. from taipy.core._orchestrator._dispatcher._standalone_job_dispatcher import _StandaloneJobDispatcher
  22. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  23. from taipy.core.config.job_config import JobConfig
  24. from taipy.core.data.in_memory import InMemoryDataNode
  25. from taipy.core.job._job_manager import _JobManager
  26. from taipy.core.job.job import Job
  27. from taipy.core.job.status import Status
  28. from taipy.core.scenario.scenario import Scenario
  29. from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
  30. from taipy.core.task._task_manager import _TaskManager
  31. from taipy.core.task.task import Task
  32. @pytest.fixture
  33. def task_id():
  34. return TaskId("task_id1")
  35. @pytest.fixture
  36. def task(task_id):
  37. return Task(config_id="name", properties={}, function=print, input=[], output=[], id=task_id)
  38. @pytest.fixture
  39. def job_id():
  40. return JobId("id1")
  41. @pytest.fixture(scope="class")
  42. def scenario():
  43. return Scenario(
  44. "scenario_config",
  45. [],
  46. {},
  47. [],
  48. "SCENARIO_scenario_config",
  49. version="random_version_number",
  50. )
  51. @pytest.fixture
  52. def job(task, job_id):
  53. return Job(job_id, task, "submit_id", "SCENARIO_scenario_config")
  54. @pytest.fixture
  55. def replace_in_memory_write_fct():
  56. default_write = InMemoryDataNode.write
  57. InMemoryDataNode.write = _error
  58. yield
  59. InMemoryDataNode.write = default_write
  60. def _foo():
  61. return 42
  62. def _error():
  63. raise RuntimeError("Something bad has happened")
  64. def test_create_job(scenario, task, job):
  65. from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
  66. _ScenarioManagerFactory._build_manager()._set(scenario)
  67. assert job.id == "id1"
  68. assert task in job
  69. assert job.is_submitted()
  70. assert job.submit_id is not None
  71. assert job.submit_entity_id == "SCENARIO_scenario_config"
  72. assert job.submit_entity == scenario
  73. with mock.patch("taipy.core.get") as get_mck:
  74. get_mck.return_value = task
  75. assert job.get_label() == "name > " + job.id
  76. assert job.get_simple_label() == job.id
  77. def test_comparison(task):
  78. job_id_1 = JobId("id1")
  79. job_id_2 = JobId("id2")
  80. job_1 = Job(job_id_1, task, "submit_id", "scenario_entity_id")
  81. sleep(0.01) # Comparison is based on time, precision on Windows is not enough important
  82. job_2 = Job(job_id_2, task, "submit_id", "scenario_entity_id")
  83. assert job_1 < job_2
  84. assert job_2 > job_1
  85. assert job_1 <= job_2
  86. assert job_1 <= job_1
  87. assert job_2 >= job_1
  88. assert job_1 >= job_1
  89. assert job_1 == job_1
  90. assert job_1 != job_2
  91. def test_status_job(task):
  92. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  93. job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
  94. submission.jobs = [job]
  95. assert job.is_submitted()
  96. assert job.is_skipped() is False
  97. assert job.is_pending() is False
  98. assert job.is_blocked() is False
  99. assert job.is_canceled() is False
  100. assert job.is_failed() is False
  101. assert job.is_completed() is False
  102. assert job.is_running() is False
  103. job.canceled()
  104. assert job.is_canceled()
  105. job.failed()
  106. assert job.is_failed()
  107. job.running()
  108. assert job.is_running()
  109. job.completed()
  110. assert job.is_completed()
  111. job.pending()
  112. assert job.is_pending()
  113. job.blocked()
  114. assert job.is_blocked()
  115. job.skipped()
  116. assert job.is_skipped()
  117. def test_stacktrace_job(task):
  118. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  119. job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
  120. fake_stacktraces = [
  121. """Traceback (most recent call last):
  122. File "<stdin>", line 1, in <module>
  123. ZeroDivisionError: division by zero""",
  124. "Another error",
  125. "yet\nAnother\nError",
  126. ]
  127. job.stacktrace = fake_stacktraces
  128. assert job.stacktrace == fake_stacktraces
  129. def test_notification_job(task):
  130. subscribe = MagicMock()
  131. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  132. job = Job("job_id", task, submission.id, "SCENARIO_scenario_config")
  133. submission.jobs = [job]
  134. job._on_status_change(subscribe)
  135. job.running()
  136. subscribe.assert_called_once_with(job)
  137. subscribe.reset_mock()
  138. job.completed()
  139. subscribe.assert_called_once_with(job)
  140. subscribe.reset_mock()
  141. job.skipped()
  142. subscribe.assert_called_once_with(job)
  143. def test_handle_exception_in_user_function(task_id, job_id):
  144. task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id)
  145. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  146. job = Job(job_id, task, submission.id, "scenario_entity_id")
  147. submission.jobs = [job]
  148. _dispatch(task, job)
  149. job = _JobManager._get(job_id)
  150. assert job.is_failed()
  151. assert 'raise RuntimeError("Something bad has happened")' in str(job.stacktrace[0])
  152. def test_handle_exception_in_input_data_node(task_id, job_id):
  153. data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
  154. task = Task(config_id="name", properties={}, input=[data_node], function=print, output=[], id=task_id)
  155. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  156. job = Job(job_id, task, submission.id, "scenario_entity_id")
  157. submission.jobs = [job]
  158. _dispatch(task, job)
  159. job = _JobManager._get(job_id)
  160. assert job.is_failed()
  161. assert "taipy.core.exceptions.exceptions.NoData" in str(job.stacktrace[0])
  162. def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_id, job_id):
  163. data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
  164. task = Task(config_id="name", properties={}, input=[], function=_foo, output=[data_node], id=task_id)
  165. submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
  166. job = Job(job_id, task, submission.id, "scenario_entity_id")
  167. submission.jobs = [job]
  168. _dispatch(task, job)
  169. job = _JobManager._get(job_id)
  170. assert job.is_failed()
  171. assert "taipy.core.exceptions.exceptions.DataNodeWritingError" in str(job.stacktrace[0])
  172. def test_auto_set_and_reload(current_datetime, job_id):
  173. task_1 = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
  174. task_2 = Task(config_id="name_2", properties={}, function=_foo, id=TaskId("task_2"))
  175. submission = _SubmissionManagerFactory._build_manager()._create(task_1.id, task_1._ID_PREFIX, task_1.config_id)
  176. job_1 = Job(job_id, task_1, submission.id, "scenario_entity_id")
  177. submission.jobs = [job_1]
  178. _TaskManager._set(task_1)
  179. _TaskManager._set(task_2)
  180. _JobManager._set(job_1)
  181. job_2 = _JobManager._get(job_1, "submit_id_2")
  182. # auto set & reload on task attribute
  183. assert job_1.task.id == task_1.id
  184. assert job_2.task.id == task_1.id
  185. job_1.task = task_2
  186. assert job_1.task.id == task_2.id
  187. assert job_2.task.id == task_2.id
  188. job_2.task = task_1
  189. assert job_1.task.id == task_1.id
  190. assert job_2.task.id == task_1.id
  191. # auto set & reload on force attribute
  192. assert not job_1.force
  193. assert not job_2.force
  194. job_1.force = True
  195. assert job_1.force
  196. assert job_2.force
  197. job_2.force = False
  198. assert not job_1.force
  199. assert not job_2.force
  200. # auto set & reload on status attribute
  201. assert job_1.status == Status.SUBMITTED
  202. assert job_2.status == Status.SUBMITTED
  203. job_1.status = Status.CANCELED
  204. assert job_1.status == Status.CANCELED
  205. assert job_2.status == Status.CANCELED
  206. job_2.status = Status.BLOCKED
  207. assert job_1.status == Status.BLOCKED
  208. assert job_2.status == Status.BLOCKED
  209. # auto set & reload on creation_date attribute
  210. new_datetime = current_datetime + timedelta(1)
  211. new_datetime_1 = current_datetime + timedelta(1)
  212. job_1.creation_date = new_datetime_1
  213. assert job_1.creation_date == new_datetime_1
  214. assert job_2.creation_date == new_datetime_1
  215. job_2.creation_date = new_datetime
  216. assert job_1.creation_date == new_datetime
  217. assert job_2.creation_date == new_datetime
  218. with job_1 as job:
  219. assert job.task.id == task_1.id
  220. assert not job.force
  221. assert job.status == Status.BLOCKED
  222. assert job.creation_date == new_datetime
  223. assert job._is_in_context
  224. new_datetime_2 = new_datetime + timedelta(1)
  225. job.task = task_2
  226. job.force = True
  227. job.status = Status.COMPLETED
  228. job.creation_date = new_datetime_2
  229. assert job.task.id == task_1.id
  230. assert not job.force
  231. assert job.status == Status.BLOCKED
  232. assert job.creation_date == new_datetime
  233. assert job._is_in_context
  234. assert job_1.task.id == task_2.id
  235. assert job_1.force
  236. assert job_1.status == Status.COMPLETED
  237. assert job_1.creation_date == new_datetime_2
  238. assert not job_1._is_in_context
  239. def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE):
  240. Config.configure_job_executions(mode=mode)
  241. _OrchestratorFactory._build_dispatcher()
  242. _TaskManager._set(task)
  243. _JobManager._set(job)
  244. dispatcher: Union[_StandaloneJobDispatcher, _DevelopmentJobDispatcher] = _StandaloneJobDispatcher(
  245. _OrchestratorFactory._orchestrator
  246. )
  247. if mode == JobConfig._DEVELOPMENT_MODE:
  248. dispatcher = _DevelopmentJobDispatcher(_OrchestratorFactory._orchestrator)
  249. dispatcher._dispatch(job)
  250. def test_is_deletable():
  251. with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit:
  252. task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
  253. job = Job(job_id, task, "submit_id_1", "scenario_entity_id")
  254. job.is_deletable()
  255. mock_submit.assert_called_once_with(job)