test_job_repositories.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pytest
  13. from taipy.core.data._data_fs_repository import _DataFSRepository
  14. from taipy.core.exceptions import ModelNotFound
  15. from taipy.core.job._job_fs_repository import _JobFSRepository
  16. from taipy.core.job.job import Job, JobId
  17. from taipy.core.task._task_fs_repository import _TaskFSRepository
  18. from taipy.core.task.task import Task
  19. class TestJobRepository:
  20. def test_save_and_load(self, data_node, job):
  21. _DataFSRepository()._save(data_node)
  22. task = Task("task_config_id", {}, print, [data_node], [data_node])
  23. _TaskFSRepository()._save(task)
  24. job._task = task
  25. repository = _JobFSRepository()
  26. repository._save(job)
  27. obj = repository._load(job.id)
  28. assert isinstance(obj, Job)
  29. def test_exists(self, data_node, job):
  30. _DataFSRepository()._save(data_node)
  31. task = Task("task_config_id", {}, print, [data_node], [data_node])
  32. _TaskFSRepository()._save(task)
  33. job._task = task
  34. repository = _JobFSRepository()
  35. repository._save(job)
  36. assert repository._exists(job.id)
  37. assert not repository._exists("not-existed-job")
  38. def test_load_all(self, data_node, job):
  39. _DataFSRepository()._save(data_node)
  40. task = Task("task_config_id", {}, print, [data_node], [data_node])
  41. _TaskFSRepository()._save(task)
  42. job._task = task
  43. repository = _JobFSRepository()
  44. for i in range(10):
  45. job.id = JobId(f"job-{i}")
  46. repository._save(job)
  47. jobs = repository._load_all()
  48. assert len(jobs) == 10
  49. def test_load_all_with_filters(self, data_node, job):
  50. repository = _JobFSRepository()
  51. _DataFSRepository()._save(data_node)
  52. task = Task("task_config_id", {}, print, [data_node], [data_node])
  53. _TaskFSRepository()._save(task)
  54. job._task = task
  55. for i in range(10):
  56. job.id = JobId(f"job-{i}")
  57. repository._save(job)
  58. objs = repository._load_all(filters=[{"id": "job-2"}])
  59. assert len(objs) == 1
  60. def test_delete(self, data_node, job):
  61. repository = _JobFSRepository()
  62. _DataFSRepository()._save(data_node)
  63. task = Task("task_config_id", {}, print, [data_node], [data_node])
  64. _TaskFSRepository()._save(task)
  65. job._task = task
  66. repository._save(job)
  67. repository._delete(job.id)
  68. with pytest.raises(ModelNotFound):
  69. repository._load(job.id)
  70. def test_delete_all(self, data_node, job):
  71. repository = _JobFSRepository()
  72. _DataFSRepository()._save(data_node)
  73. task = Task("task_config_id", {}, print, [data_node], [data_node])
  74. _TaskFSRepository()._save(task)
  75. job._task = task
  76. for i in range(10):
  77. job.id = JobId(f"job-{i}")
  78. repository._save(job)
  79. assert len(repository._load_all()) == 10
  80. repository._delete_all()
  81. assert len(repository._load_all()) == 0
  82. def test_delete_many(self, data_node, job):
  83. repository = _JobFSRepository()
  84. _DataFSRepository()._save(data_node)
  85. task = Task("task_config_id", {}, print, [data_node], [data_node])
  86. _TaskFSRepository()._save(task)
  87. job._task = task
  88. for i in range(10):
  89. job.id = JobId(f"job-{i}")
  90. repository._save(job)
  91. objs = repository._load_all()
  92. assert len(objs) == 10
  93. ids = [x.id for x in objs[:3]]
  94. repository._delete_many(ids)
  95. assert len(repository._load_all()) == 7
  96. def test_delete_by(self, data_node, job):
  97. repository = _JobFSRepository()
  98. _DataFSRepository()._save(data_node)
  99. task = Task("task_config_id", {}, print, [data_node], [data_node])
  100. _TaskFSRepository()._save(task)
  101. job._task = task
  102. # Create 5 entities with version 1.0 and 5 entities with version 2.0
  103. for i in range(10):
  104. job.id = JobId(f"job-{i}")
  105. job._version = f"{(i+1) // 5}.0"
  106. repository._save(job)
  107. objs = repository._load_all()
  108. assert len(objs) == 10
  109. repository._delete_by("version", "1.0")
  110. assert len(repository._load_all()) == 5
  111. def test_search(self, data_node, job):
  112. repository = _JobFSRepository()
  113. _DataFSRepository()._save(data_node)
  114. task = Task("task_config_id", {}, print, [data_node], [data_node])
  115. _TaskFSRepository()._save(task)
  116. job._task = task
  117. for i in range(10):
  118. job.id = JobId(f"job-{i}")
  119. repository._save(job)
  120. assert len(repository._load_all()) == 10
  121. objs = repository._search("id", "job-2")
  122. assert len(objs) == 1
  123. assert isinstance(objs[0], Job)
  124. objs = repository._search("id", "job-2", filters=[{"version": "random_version_number"}])
  125. assert len(objs) == 1
  126. assert isinstance(objs[0], Job)
  127. assert repository._search("id", "job-2", filters=[{"version": "non_existed_version"}]) == []
  128. def test_export(self, tmpdir, job):
  129. repository = _JobFSRepository()
  130. repository._save(job)
  131. repository._export(job.id, tmpdir.strpath)
  132. dir_path = repository.dir_path
  133. assert os.path.exists(os.path.join(dir_path, f"{job.id}.json"))