test_dag.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from typing import List
  12. from taipy.common.config.common.scope import Scope
  13. from taipy.core import DataNode, Sequence, SequenceId, Task, TaskId
  14. from taipy.core._entity._dag import _DAG
  15. def assert_x(x: int, *nodes):
  16. for node in nodes:
  17. assert node.x == x
  18. def assert_y(y: List[int], *nodes):
  19. for node in nodes:
  20. assert node.y in y
  21. y.remove(node.y)
  22. def assert_x_y(x: int, y: List[int], *nodes):
  23. assert_x(x, *nodes)
  24. for node in nodes:
  25. assert node.y in y
  26. y.remove(node.y)
  27. def assert_edge_exists(src, dest, dag: _DAG):
  28. list_of_tuples = [(edge.src.entity.id, edge.dest.entity.id) for edge in dag.edges]
  29. assert (src, dest) in list_of_tuples
  30. class TestDAG:
  31. def test_get_dag_1(self):
  32. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  33. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  34. data_node_3 = DataNode("baz", Scope.SCENARIO, "s3")
  35. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  36. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  37. data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
  38. data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
  39. task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1"))
  40. task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
  41. task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
  42. task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
  43. sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1"))
  44. dag = sequence._get_dag()
  45. # s1 --- ---> s3 ---> t2 ---> s5 ----
  46. # | | |
  47. # |---> t1 ---| -------------------------> t3 ---> s6
  48. # | | |
  49. # s2 --- ---> s4 ---> t4 ---> s7
  50. assert dag.length == 7
  51. assert dag.width == 2
  52. assert dag._grid_length == 7
  53. assert dag._grid_width == 3
  54. assert len(dag.nodes) == 11
  55. assert_x_y(0, [0, 2], dag.nodes["s1"], dag.nodes["s2"])
  56. assert_x_y(1, [1], dag.nodes["t1"])
  57. assert_x_y(2, [0, 2], dag.nodes["s3"], dag.nodes["s4"])
  58. assert_x_y(3, [0, 2], dag.nodes["t2"], dag.nodes["t4"])
  59. assert_x_y(4, [0, 2], dag.nodes["s5"], dag.nodes["s7"])
  60. assert_x_y(5, [1], dag.nodes["t3"])
  61. assert_x_y(6, [1], dag.nodes["s6"])
  62. assert len(dag.edges) == 11
  63. assert_edge_exists("s1", "t1", dag)
  64. assert_edge_exists("s2", "t1", dag)
  65. assert_edge_exists("t1", "s3", dag)
  66. assert_edge_exists("t1", "s4", dag)
  67. assert_edge_exists("s3", "t2", dag)
  68. assert_edge_exists("t2", "s5", dag)
  69. assert_edge_exists("s5", "t3", dag)
  70. assert_edge_exists("s4", "t3", dag)
  71. assert_edge_exists("t3", "s6", dag)
  72. assert_edge_exists("s4", "t4", dag)
  73. assert_edge_exists("t4", "s7", dag)
  74. def test_get_dag_2(self):
  75. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  76. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  77. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  78. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  79. data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
  80. data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
  81. task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
  82. task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2"))
  83. task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
  84. task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
  85. sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1"))
  86. # 6 | t2 _____
  87. # 5 | \
  88. # 4 | s5 _________________ t3 _______ s6
  89. # 3 | s1 __ _ s4 _____/
  90. # 2 | \ _ t1 ____/ \_ t4 _______ s7
  91. # 1 | /
  92. # 0 | s2 --
  93. # |________________________________________________
  94. # 0 1 2 3 4
  95. dag = sequence._get_dag()
  96. assert dag.length == 5
  97. assert dag.width == 3
  98. assert dag._grid_length == 5
  99. assert dag._grid_width == 7
  100. assert len(dag.nodes) == 10
  101. assert_x_y(0, [0, 3, 6], dag.nodes["s1"], dag.nodes["s2"], dag.nodes["t2"])
  102. assert_x_y(1, [2, 4], dag.nodes["t1"], dag.nodes["s5"])
  103. assert_x_y(2, [3], dag.nodes["s4"])
  104. assert_x_y(3, [2, 4], dag.nodes["t3"], dag.nodes["t4"])
  105. assert_x_y(4, [2, 4], dag.nodes["s6"], dag.nodes["s7"])
  106. assert len(dag.edges) == 9
  107. assert_edge_exists("s1", "t1", dag)
  108. assert_edge_exists("s2", "t1", dag)
  109. assert_edge_exists("t1", "s4", dag)
  110. assert_edge_exists("t2", "s5", dag)
  111. assert_edge_exists("s5", "t3", dag)
  112. assert_edge_exists("s4", "t3", dag)
  113. assert_edge_exists("t3", "s6", dag)
  114. assert_edge_exists("s4", "t4", dag)
  115. assert_edge_exists("t4", "s7", dag)
  116. def test_get_dag_3(self):
  117. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  118. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  119. data_node_3 = DataNode("quuz", Scope.SCENARIO, "s3")
  120. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  121. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  122. data_node_6 = DataNode("corge", Scope.SCENARIO, "s6")
  123. data_node_7 = DataNode("hugh", Scope.SCENARIO, "s7")
  124. task_1 = Task("grault", {}, print, [data_node_1, data_node_2, data_node_3], [data_node_4], TaskId("t1"))
  125. task_2 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t2"))
  126. task_3 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t3"))
  127. task_4 = Task("garply", {}, print, output=[data_node_6], id=TaskId("t4"))
  128. task_5 = Task("bob", {}, print, [data_node_7], None, TaskId("t5"))
  129. sequence = Sequence({}, [task_5, task_3, task_4, task_1, task_2], SequenceId("p1"))
  130. # 12 | s7 __
  131. # 11 | \
  132. # 10 | \
  133. # 9 | t4 _ \_ t5
  134. # 8 | \ ____ t3 ___
  135. # 7 | \ / \
  136. # 6 | s3 _ \__ s6 _ s4 _/ \___ s5
  137. # 5 | \ / \
  138. # 4 | \ / \____ t2
  139. # 3 | s2 ___\__ t1 __/
  140. # 2 | /
  141. # 1 | /
  142. # 0 | s1 _/
  143. # |________________________________________________
  144. # 0 1 2 3 4
  145. dag = sequence._get_dag()
  146. assert dag.length == 5
  147. assert dag.width == 5
  148. assert dag._grid_length == 5
  149. assert dag._grid_width == 13
  150. assert len(dag.nodes) == 12
  151. assert_x_y(
  152. 0, [0, 3, 6, 9, 12], dag.nodes["s1"], dag.nodes["s2"], dag.nodes["s3"], dag.nodes["s7"], dag.nodes["t4"]
  153. )
  154. assert_x_y(1, [3, 6, 9], dag.nodes["t1"], dag.nodes["t5"], dag.nodes["s6"])
  155. assert_x_y(2, [6], dag.nodes["s4"])
  156. assert_x_y(3, [4, 8], dag.nodes["t2"], dag.nodes["t3"])
  157. assert_x_y(4, [6], dag.nodes["s5"])
  158. assert len(dag.edges) == 9
  159. assert_edge_exists("s1", "t1", dag)
  160. assert_edge_exists("s2", "t1", dag)
  161. assert_edge_exists("s3", "t1", dag)
  162. assert_edge_exists("t1", "s4", dag)
  163. assert_edge_exists("s4", "t2", dag)
  164. assert_edge_exists("s4", "t3", dag)
  165. assert_edge_exists("t3", "s5", dag)
  166. assert_edge_exists("t4", "s6", dag)
  167. assert_edge_exists("s7", "t5", dag)
  168. def test_get_dag_4(self):
  169. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  170. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  171. data_node_3 = DataNode("quuz", Scope.SCENARIO, "s3")
  172. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  173. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  174. data_node_6 = DataNode("corge", Scope.SCENARIO, "s6")
  175. task_1 = Task("grault", {}, print, [data_node_1, data_node_2, data_node_3], [data_node_4], TaskId("t1"))
  176. task_2 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t2"))
  177. task_3 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t3"))
  178. task_4 = Task("garply", {}, print, output=[data_node_6], id=TaskId("t4"))
  179. sequence = Sequence({}, [task_3, task_4, task_1, task_2], SequenceId("p1"))
  180. # 6 | t4 __
  181. # 5 | \
  182. # 4 | s3 _ \__ s6 ______ t3 ___
  183. # 3 | \ ___ s4 _/ \___ s5
  184. # 2 | s2 __\__ t1 __/ \______ t2
  185. # 1 | /
  186. # 0 | s1 _/
  187. # |________________________________________________
  188. # 0 1 2 3 4
  189. dag = sequence._get_dag()
  190. assert dag.length == 5
  191. assert dag.width == 4
  192. assert dag._grid_length == 5
  193. assert dag._grid_width == 7
  194. assert len(dag.nodes) == 10
  195. assert_x_y(0, [0, 2, 4, 6], dag.nodes["s1"], dag.nodes["s2"], dag.nodes["s3"], dag.nodes["t4"])
  196. assert_x_y(1, [2, 4], dag.nodes["t1"], dag.nodes["s6"])
  197. assert_x_y(2, [3], dag.nodes["s4"])
  198. assert_x_y(3, [2, 4], dag.nodes["t2"], dag.nodes["t3"])
  199. assert_x_y(4, [3], dag.nodes["s5"])
  200. assert len(dag.edges) == 8
  201. assert_edge_exists("s1", "t1", dag)
  202. assert_edge_exists("s2", "t1", dag)
  203. assert_edge_exists("s3", "t1", dag)
  204. assert_edge_exists("t1", "s4", dag)
  205. assert_edge_exists("s4", "t2", dag)
  206. assert_edge_exists("s4", "t3", dag)
  207. assert_edge_exists("t3", "s5", dag)
  208. assert_edge_exists("t4", "s6", dag)
  209. def test_get_dag_5(self):
  210. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  211. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  212. task_1 = Task("baz", {}, print, [data_node_1], [data_node_2], TaskId("t1"))
  213. sequence = Sequence({}, [task_1], SequenceId("p1"))
  214. # 1 |
  215. # 0 | s1 __ t1 __ s2
  216. # |_________________
  217. # 0 1 2
  218. dag = sequence._get_dag()
  219. assert dag.length == 3
  220. assert dag.width == 1
  221. assert dag._grid_length == 3
  222. assert dag._grid_width == 1
  223. assert len(dag.nodes) == 3
  224. assert_x_y(0, [0], dag.nodes["s1"])
  225. assert_x_y(1, [0], dag.nodes["t1"])
  226. assert_x_y(2, [0], dag.nodes["s2"])
  227. assert len(dag.edges) == 2
  228. assert_edge_exists("s1", "t1", dag)
  229. assert_edge_exists("t1", "s2", dag)
  230. def test_get_dag_6(self):
  231. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  232. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  233. data_node_3 = DataNode("baz", Scope.SCENARIO, "s3")
  234. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  235. task_1 = Task("quux", {}, print, [data_node_1, data_node_2], [data_node_3], TaskId("t1"))
  236. task_2 = Task("quuz", {}, print, [data_node_2], [data_node_4], TaskId("t2"))
  237. sequence = Sequence({}, [task_1, task_2], SequenceId("p1"))
  238. # 2 |
  239. # |
  240. # 1 | s1 ___ t1 __ s3
  241. # | /
  242. # 0 | s2 /__ t2 __ s4
  243. # |_________________
  244. # 0 1 2
  245. dag = sequence._get_dag()
  246. assert dag.length == 3
  247. assert dag.width == 2
  248. assert dag._grid_length == 3
  249. assert dag._grid_width == 2
  250. assert len(dag.nodes) == 6
  251. assert_x_y(0, [0, 1], dag.nodes["s1"], dag.nodes["s2"])
  252. assert_x_y(1, [0, 1], dag.nodes["t1"], dag.nodes["t2"])
  253. assert_x_y(2, [0, 1], dag.nodes["s3"], dag.nodes["s4"])
  254. assert len(dag.edges) == 5
  255. assert_edge_exists("s1", "t1", dag)
  256. assert_edge_exists("s2", "t1", dag)
  257. assert_edge_exists("t1", "s3", dag)
  258. assert_edge_exists("s2", "t2", dag)
  259. assert_edge_exists("t2", "s4", dag)