test_complex_application.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. from time import sleep
  14. from unittest.mock import patch
  15. import pandas as pd
  16. import taipy.core.taipy as tp
  17. from taipy.config import Config
  18. from taipy.core import Core, Status
  19. # ################################ USER FUNCTIONS ##################################
  20. def sum(a, b):
  21. a = a["number"]
  22. b = b["number"]
  23. return a + b
  24. def subtract(a, b):
  25. a = a["number"]
  26. b = b["number"]
  27. return a - b
  28. def mult(a, b):
  29. return a * b
  30. def mult_by_2(a):
  31. return a
  32. def divide(a, b):
  33. return a / b
  34. def average(a):
  35. return [a.sum() / len(a)]
  36. def div_constant_with_sleep(a):
  37. sleep(1)
  38. return a["number"] / 10
  39. def return_a_number():
  40. return 10
  41. def return_a_number_with_sleep():
  42. sleep(1)
  43. return 10
  44. # ################################ TEST METHODS ##################################
  45. def test_skipped_jobs():
  46. input_config = Config.configure_data_node("input_dn")
  47. intermediate_config = Config.configure_data_node("intermediate")
  48. output_config = Config.configure_data_node("output_dn")
  49. task_config_1 = Config.configure_task("first", mult_by_2, input_config, intermediate_config, skippable=True)
  50. task_config_2 = Config.configure_task("second", mult_by_2, intermediate_config, output_config, skippable=True)
  51. scenario_config = Config.configure_scenario("scenario", [task_config_1, task_config_2])
  52. with patch("sys.argv", ["prog"]):
  53. core = Core()
  54. core.run()
  55. scenario = tp.create_scenario(scenario_config)
  56. scenario.input_dn.write(2)
  57. scenario.submit()
  58. assert len(tp.get_jobs()) == 2
  59. for job in tp.get_jobs():
  60. assert job.status == Status.COMPLETED
  61. scenario.submit()
  62. assert len(tp.get_jobs()) == 4
  63. skipped = []
  64. for job in tp.get_jobs():
  65. if job.status != Status.COMPLETED:
  66. assert job.status == Status.SKIPPED
  67. skipped.append(job)
  68. assert len(skipped) == 2
  69. core.stop()
  70. def test_complex():
  71. # d1 --- t1
  72. # |
  73. # | --- t2 --- d5 --- | t10 --- d12
  74. # | | |
  75. # | | |
  76. # d2 | --- t5 --- d7 --- t7 --- d9 --- t8 --- d10 --- t9 --- d11
  77. # | | |
  78. # d3 --- | | | |
  79. # | | | t6 --- d8 -------------------
  80. # | t3 --- d6 ---|
  81. # | |
  82. # | |
  83. # t4 d4
  84. csv_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
  85. excel_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
  86. csv_path_sum = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/sum.csv")
  87. excel_path_sum = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/sum.xlsx")
  88. excel_path_out = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/res.xlsx")
  89. csv_path_out = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/res.csv")
  90. inp_csv_dn_1 = Config.configure_csv_data_node("dn_csv_in_1", default_path=csv_path_inp)
  91. inp_csv_dn_2 = Config.configure_csv_data_node("dn_csv_in_2", default_path=csv_path_inp)
  92. inp_excel_dn_1 = Config.configure_excel_data_node("dn_excel_in_1", default_path=excel_path_inp, sheet_name="Sheet1")
  93. inp_excel_dn_2 = Config.configure_excel_data_node("dn_excel_in_2", default_path=excel_path_inp, sheet_name="Sheet1")
  94. placeholder = Config.configure_data_node("dn_placeholder", default_data=10)
  95. dn_csv_sum = Config.configure_csv_data_node("dn_sum_csv", default_path=csv_path_sum)
  96. dn_excel_sum = Config.configure_excel_data_node("dn_sum_excel", default_path=excel_path_sum, sheet_name="Sheet1")
  97. dn_subtract_csv_excel = Config.configure_pickle_data_node("dn_subtract_csv_excel")
  98. dn_mult = Config.configure_pickle_data_node("dn_mult")
  99. dn_div = Config.configure_pickle_data_node("dn_div")
  100. output_csv_dn = Config.configure_csv_data_node("csv_out", csv_path_out)
  101. output_excel_dn = Config.configure_excel_data_node("excel_out", excel_path_out)
  102. task_print_csv = Config.configure_task("task_print_csv", print, input=inp_csv_dn_1)
  103. task_print_excel = Config.configure_task("task_print_excel", print, input=inp_excel_dn_1)
  104. task_sum_csv = Config.configure_task("task_sum_csv", sum, input=[inp_csv_dn_2, inp_csv_dn_1], output=dn_csv_sum)
  105. task_sum_excel = Config.configure_task(
  106. "task_sum_excel", sum, input=[inp_excel_dn_2, inp_excel_dn_1], output=dn_excel_sum
  107. )
  108. task_subtract_csv_excel = Config.configure_task(
  109. "task_subtract_csv_excel", subtract, input=[dn_csv_sum, dn_excel_sum], output=dn_subtract_csv_excel
  110. )
  111. task_insert_placeholder = Config.configure_task("task_insert_placeholder", return_a_number, output=[placeholder])
  112. task_mult = Config.configure_task(
  113. "task_mult_by_placeholder", mult, input=[dn_subtract_csv_excel, placeholder], output=dn_mult
  114. )
  115. task_div = Config.configure_task("task_div_by_placeholder", divide, input=[dn_mult, placeholder], output=dn_div)
  116. task_avg_div = Config.configure_task("task_avg_div", average, input=dn_div, output=output_csv_dn)
  117. task_avg_mult = Config.configure_task("task_avg_mult", average, input=dn_mult, output=output_excel_dn)
  118. scenario_config = Config.configure_scenario(
  119. "scenario",
  120. [
  121. task_print_csv,
  122. task_print_excel,
  123. task_sum_csv,
  124. task_sum_excel,
  125. task_subtract_csv_excel,
  126. task_insert_placeholder,
  127. task_mult,
  128. task_div,
  129. task_avg_div,
  130. task_avg_mult,
  131. ],
  132. )
  133. with patch("sys.argv", ["prog"]):
  134. core = Core()
  135. core.run()
  136. scenario = tp.create_scenario(scenario_config)
  137. tp.submit(scenario)
  138. core.stop()
  139. csv_sum_res = pd.read_csv(csv_path_sum)
  140. excel_sum_res = pd.read_excel(excel_path_sum)
  141. csv_out = pd.read_csv(csv_path_out)
  142. excel_out = pd.read_excel(excel_path_out)
  143. assert csv_sum_res.to_numpy().flatten().tolist() == [i * 20 for i in range(1, 11)]
  144. assert excel_sum_res.to_numpy().flatten().tolist() == [i * 2 for i in range(1, 11)]
  145. assert average(csv_sum_res["number"] - excel_sum_res["number"]) == csv_out.to_numpy()[0]
  146. assert average((csv_sum_res["number"] - excel_sum_res["number"]) * 10) == excel_out.to_numpy()[0]
  147. for path in [csv_path_sum, excel_path_sum, csv_path_out, excel_path_out]:
  148. os.remove(path)