test_complex_application.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # Copyright 2023 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. from time import sleep
  14. from unittest.mock import patch
  15. import pandas as pd
  16. import taipy.core.taipy as tp
  17. from taipy.config import Config
  18. from taipy.core import Core, Status
  19. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  20. from taipy.core.config.job_config import JobConfig
  21. # ################################ USER FUNCTIONS ##################################
  22. def sum(a, b):
  23. a = a["number"]
  24. b = b["number"]
  25. return a + b
  26. def subtract(a, b):
  27. a = a["number"]
  28. b = b["number"]
  29. return a - b
  30. def mult(a, b):
  31. return a * b
  32. def mult_by_2(a):
  33. return a
  34. def divide(a, b):
  35. return a / b
  36. def average(a):
  37. return [a.sum() / len(a)]
  38. def div_constant_with_sleep(a):
  39. sleep(1)
  40. return a["number"] / 10
  41. def return_a_number():
  42. return 10
  43. def return_a_number_with_sleep():
  44. sleep(1)
  45. return 10
  46. # ################################ TEST METHODS ##################################
  47. def test_skipped_jobs():
  48. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  49. _OrchestratorFactory._build_orchestrator()
  50. input_config = Config.configure_data_node("input_dn")
  51. intermediate_config = Config.configure_data_node("intermediate")
  52. output_config = Config.configure_data_node("output_dn")
  53. task_config_1 = Config.configure_task("first", mult_by_2, input_config, intermediate_config, skippable=True)
  54. task_config_2 = Config.configure_task("second", mult_by_2, intermediate_config, output_config, skippable=True)
  55. scenario_config = Config.configure_scenario("scenario", [task_config_1, task_config_2])
  56. with patch("sys.argv", ["prog"]):
  57. core = Core()
  58. core.run()
  59. scenario = tp.create_scenario(scenario_config)
  60. scenario.input_dn.write(2)
  61. scenario.submit()
  62. assert len(tp.get_jobs()) == 2
  63. for job in tp.get_jobs():
  64. assert job.status == Status.COMPLETED
  65. scenario.submit()
  66. assert len(tp.get_jobs()) == 4
  67. skipped = []
  68. for job in tp.get_jobs():
  69. if job.status != Status.COMPLETED:
  70. assert job.status == Status.SKIPPED
  71. skipped.append(job)
  72. assert len(skipped) == 2
  73. core.stop()
  74. def test_complex():
  75. # d1 --- t1
  76. # |
  77. # | --- t2 --- d5 --- | t10 --- d12
  78. # | | |
  79. # | | |
  80. # d2 | --- t5 --- d7 --- t7 --- d9 --- t8 --- d10 --- t9 --- d11
  81. # | | |
  82. # d3 --- | | | |
  83. # | | | t6 --- d8 -------------------
  84. # | t3 --- d6 ---|
  85. # | |
  86. # | |
  87. # t4 d4
  88. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  89. _OrchestratorFactory._build_orchestrator()
  90. csv_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
  91. excel_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
  92. csv_path_sum = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/sum.csv")
  93. excel_path_sum = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/sum.xlsx")
  94. excel_path_out = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/res.xlsx")
  95. csv_path_out = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/res.csv")
  96. inp_csv_dn_1 = Config.configure_csv_data_node("dn_csv_in_1", default_path=csv_path_inp)
  97. inp_csv_dn_2 = Config.configure_csv_data_node("dn_csv_in_2", default_path=csv_path_inp)
  98. inp_excel_dn_1 = Config.configure_excel_data_node("dn_excel_in_1", default_path=excel_path_inp, sheet_name="Sheet1")
  99. inp_excel_dn_2 = Config.configure_excel_data_node("dn_excel_in_2", default_path=excel_path_inp, sheet_name="Sheet1")
  100. placeholder = Config.configure_data_node("dn_placeholder", default_data=10)
  101. dn_csv_sum = Config.configure_csv_data_node("dn_sum_csv", default_path=csv_path_sum)
  102. dn_excel_sum = Config.configure_excel_data_node("dn_sum_excel", default_path=excel_path_sum, sheet_name="Sheet1")
  103. dn_subtract_csv_excel = Config.configure_pickle_data_node("dn_subtract_csv_excel")
  104. dn_mult = Config.configure_pickle_data_node("dn_mult")
  105. dn_div = Config.configure_pickle_data_node("dn_div")
  106. output_csv_dn = Config.configure_csv_data_node("csv_out", csv_path_out)
  107. output_excel_dn = Config.configure_excel_data_node("excel_out", excel_path_out)
  108. task_print_csv = Config.configure_task("task_print_csv", print, input=inp_csv_dn_1)
  109. task_print_excel = Config.configure_task("task_print_excel", print, input=inp_excel_dn_1)
  110. task_sum_csv = Config.configure_task("task_sum_csv", sum, input=[inp_csv_dn_2, inp_csv_dn_1], output=dn_csv_sum)
  111. task_sum_excel = Config.configure_task(
  112. "task_sum_excel", sum, input=[inp_excel_dn_2, inp_excel_dn_1], output=dn_excel_sum
  113. )
  114. task_subtract_csv_excel = Config.configure_task(
  115. "task_subtract_csv_excel", subtract, input=[dn_csv_sum, dn_excel_sum], output=dn_subtract_csv_excel
  116. )
  117. task_insert_placeholder = Config.configure_task("task_insert_placeholder", return_a_number, output=[placeholder])
  118. task_mult = Config.configure_task(
  119. "task_mult_by_placeholder", mult, input=[dn_subtract_csv_excel, placeholder], output=dn_mult
  120. )
  121. task_div = Config.configure_task("task_div_by_placeholder", divide, input=[dn_mult, placeholder], output=dn_div)
  122. task_avg_div = Config.configure_task("task_avg_div", average, input=dn_div, output=output_csv_dn)
  123. task_avg_mult = Config.configure_task("task_avg_mult", average, input=dn_mult, output=output_excel_dn)
  124. scenario_config = Config.configure_scenario(
  125. "scenario",
  126. [
  127. task_print_csv,
  128. task_print_excel,
  129. task_sum_csv,
  130. task_sum_excel,
  131. task_subtract_csv_excel,
  132. task_insert_placeholder,
  133. task_mult,
  134. task_div,
  135. task_avg_div,
  136. task_avg_mult,
  137. ],
  138. )
  139. with patch("sys.argv", ["prog"]):
  140. core = Core()
  141. core.run()
  142. scenario = tp.create_scenario(scenario_config)
  143. tp.submit(scenario)
  144. core.stop()
  145. csv_sum_res = pd.read_csv(csv_path_sum)
  146. excel_sum_res = pd.read_excel(excel_path_sum)
  147. csv_out = pd.read_csv(csv_path_out)
  148. excel_out = pd.read_excel(excel_path_out)
  149. assert csv_sum_res.to_numpy().flatten().tolist() == [i * 20 for i in range(1, 11)]
  150. assert excel_sum_res.to_numpy().flatten().tolist() == [i * 2 for i in range(1, 11)]
  151. assert average(csv_sum_res["number"] - excel_sum_res["number"]) == csv_out.to_numpy()[0]
  152. assert average((csv_sum_res["number"] - excel_sum_res["number"]) * 10) == excel_out.to_numpy()[0]
  153. for path in [csv_path_sum, excel_path_sum, csv_path_out, excel_path_out]:
  154. os.remove(path)