test_complex_application.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. from time import sleep
  14. import pandas as pd
  15. import taipy.core.taipy as tp
  16. from taipy.common.config import Config
  17. from taipy.core import Orchestrator, Status
  18. # ################################ USER FUNCTIONS ##################################
  19. def sum(a, b):
  20. a = a["number"]
  21. b = b["number"]
  22. return a + b
  23. def subtract(a, b):
  24. a = a["number"]
  25. b = b["number"]
  26. return a - b
  27. def mult(a, b):
  28. return a * b
  29. def mult_by_2(a):
  30. return a
  31. def divide(a, b):
  32. return a / b
  33. def average(a):
  34. return [a.sum() / len(a)]
  35. def div_constant_with_sleep(a):
  36. sleep(1)
  37. return a["number"] / 10
  38. def return_a_number():
  39. return 10
  40. def return_a_number_with_sleep():
  41. sleep(1)
  42. return 10
  43. # ################################ TEST METHODS ##################################
  44. def test_skipped_jobs():
  45. input_config = Config.configure_data_node("input_dn")
  46. intermediate_config = Config.configure_data_node("intermediate")
  47. output_config = Config.configure_data_node("output_dn")
  48. task_config_1 = Config.configure_task("first", mult_by_2, input_config, intermediate_config, skippable=True)
  49. task_config_2 = Config.configure_task("second", mult_by_2, intermediate_config, output_config, skippable=True)
  50. scenario_config = Config.configure_scenario("scenario", [task_config_1, task_config_2])
  51. orchestrator = Orchestrator()
  52. orchestrator.run()
  53. scenario = tp.create_scenario(scenario_config)
  54. scenario.input_dn.write(2)
  55. scenario.submit()
  56. assert len(tp.get_jobs()) == 2
  57. for job in tp.get_jobs():
  58. assert job.status == Status.COMPLETED
  59. scenario.submit()
  60. assert len(tp.get_jobs()) == 4
  61. skipped = []
  62. for job in tp.get_jobs():
  63. if job.status != Status.COMPLETED:
  64. assert job.status == Status.SKIPPED
  65. skipped.append(job)
  66. assert len(skipped) == 2
  67. orchestrator.stop()
  68. def test_complex():
  69. # d1 --- t1
  70. # |
  71. # | --- t2 --- d5 --- | t10 --- d12
  72. # | | |
  73. # | | |
  74. # d2 | --- t5 --- d7 --- t7 --- d9 --- t8 --- d10 --- t9 --- d11
  75. # | | |
  76. # d3 --- | | | |
  77. # | | | t6 --- d8 -------------------
  78. # | t3 --- d6 ---|
  79. # | |
  80. # | |
  81. # t4 d4
  82. csv_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
  83. excel_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
  84. csv_path_sum = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/sum.csv")
  85. excel_path_sum = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/sum.xlsx")
  86. excel_path_out = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/res.xlsx")
  87. csv_path_out = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/res.csv")
  88. inp_csv_dn_1 = Config.configure_csv_data_node("dn_csv_in_1", default_path=csv_path_inp)
  89. inp_csv_dn_2 = Config.configure_csv_data_node("dn_csv_in_2", default_path=csv_path_inp)
  90. inp_excel_dn_1 = Config.configure_excel_data_node("dn_excel_in_1", default_path=excel_path_inp, sheet_name="Sheet1")
  91. inp_excel_dn_2 = Config.configure_excel_data_node("dn_excel_in_2", default_path=excel_path_inp, sheet_name="Sheet1")
  92. placeholder = Config.configure_data_node("dn_placeholder", default_data=10)
  93. dn_csv_sum = Config.configure_csv_data_node("dn_sum_csv", default_path=csv_path_sum)
  94. dn_excel_sum = Config.configure_excel_data_node("dn_sum_excel", default_path=excel_path_sum, sheet_name="Sheet1")
  95. dn_subtract_csv_excel = Config.configure_pickle_data_node("dn_subtract_csv_excel")
  96. dn_mult = Config.configure_pickle_data_node("dn_mult")
  97. dn_div = Config.configure_pickle_data_node("dn_div")
  98. output_csv_dn = Config.configure_csv_data_node("csv_out", csv_path_out)
  99. output_excel_dn = Config.configure_excel_data_node("excel_out", excel_path_out)
  100. task_print_csv = Config.configure_task("task_print_csv", print, input=inp_csv_dn_1)
  101. task_print_excel = Config.configure_task("task_print_excel", print, input=inp_excel_dn_1)
  102. task_sum_csv = Config.configure_task("task_sum_csv", sum, input=[inp_csv_dn_2, inp_csv_dn_1], output=dn_csv_sum)
  103. task_sum_excel = Config.configure_task(
  104. "task_sum_excel", sum, input=[inp_excel_dn_2, inp_excel_dn_1], output=dn_excel_sum
  105. )
  106. task_subtract_csv_excel = Config.configure_task(
  107. "task_subtract_csv_excel", subtract, input=[dn_csv_sum, dn_excel_sum], output=dn_subtract_csv_excel
  108. )
  109. task_insert_placeholder = Config.configure_task("task_insert_placeholder", return_a_number, output=[placeholder])
  110. task_mult = Config.configure_task(
  111. "task_mult_by_placeholder", mult, input=[dn_subtract_csv_excel, placeholder], output=dn_mult
  112. )
  113. task_div = Config.configure_task("task_div_by_placeholder", divide, input=[dn_mult, placeholder], output=dn_div)
  114. task_avg_div = Config.configure_task("task_avg_div", average, input=dn_div, output=output_csv_dn)
  115. task_avg_mult = Config.configure_task("task_avg_mult", average, input=dn_mult, output=output_excel_dn)
  116. scenario_config = Config.configure_scenario(
  117. "scenario",
  118. [
  119. task_print_csv,
  120. task_print_excel,
  121. task_sum_csv,
  122. task_sum_excel,
  123. task_subtract_csv_excel,
  124. task_insert_placeholder,
  125. task_mult,
  126. task_div,
  127. task_avg_div,
  128. task_avg_mult,
  129. ],
  130. )
  131. orchestrator = Orchestrator()
  132. orchestrator.run()
  133. scenario = tp.create_scenario(scenario_config)
  134. tp.submit(scenario)
  135. orchestrator.stop()
  136. csv_sum_res = pd.read_csv(csv_path_sum)
  137. excel_sum_res = pd.read_excel(excel_path_sum)
  138. csv_out = pd.read_csv(csv_path_out)
  139. excel_out = pd.read_excel(excel_path_out)
  140. assert csv_sum_res.to_numpy().flatten().tolist() == [i * 20 for i in range(1, 11)]
  141. assert excel_sum_res.to_numpy().flatten().tolist() == [i * 2 for i in range(1, 11)]
  142. assert average(csv_sum_res["number"] - excel_sum_res["number"]) == csv_out.to_numpy()[0]
  143. assert average((csv_sum_res["number"] - excel_sum_res["number"]) * 10) == excel_out.to_numpy()[0]
  144. for path in [csv_path_sum, excel_path_sum, csv_path_out, excel_path_out]:
  145. os.remove(path)