test_dag.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from typing import List
  12. from taipy.core import DataNode, Sequence, SequenceId, Task, TaskId
  13. from taipy.core._entity._dag import _DAG
  14. from taipy.core.common.scope import Scope
  15. def assert_x(x: int, *nodes):
  16. for node in nodes:
  17. assert node.x == x
  18. def assert_y(y: List[int], *nodes):
  19. for node in nodes:
  20. assert node.y in y
  21. y.remove(node.y)
  22. def assert_x_y(x: int, y: List[int], *nodes):
  23. assert_x(x, *nodes)
  24. for node in nodes:
  25. assert node.y in y
  26. y.remove(node.y)
  27. def assert_edge_exists(src, dest, dag: _DAG):
  28. list_of_tuples = [(edge.src.entity.id, edge.dest.entity.id) for edge in dag.edges]
  29. assert (src, dest) in list_of_tuples
  30. class TestDAG:
  31. def test_get_dag_1(self):
  32. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  33. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  34. data_node_3 = DataNode("baz", Scope.SCENARIO, "s3")
  35. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  36. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  37. data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
  38. data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
  39. task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3, data_node_4], TaskId("t1"))
  40. task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
  41. task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
  42. task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
  43. sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1"))
  44. dag = sequence._get_dag()
  45. # s1 --- ---> s3 ---> t2 ---> s5 ----
  46. # | | |
  47. # |---> t1 ---| -------------------------> t3 ---> s6
  48. # | | |
  49. # s2 --- ---> s4 ---> t4 ---> s7
  50. assert dag.length == 7
  51. assert dag.width == 2
  52. assert dag._grid_length == 7
  53. assert dag._grid_width == 3
  54. assert len(dag.nodes) == 11
  55. assert_x_y(0, [0, 2], dag.nodes["s1"], dag.nodes["s2"])
  56. assert_x_y(1, [1], dag.nodes["t1"])
  57. assert_x_y(2, [0, 2], dag.nodes["s3"], dag.nodes["s4"])
  58. assert_x_y(3, [0, 2], dag.nodes["t2"], dag.nodes["t4"])
  59. assert_x_y(4, [0, 2], dag.nodes["s5"], dag.nodes["s7"])
  60. assert_x_y(5, [1], dag.nodes["t3"])
  61. assert_x_y(6, [1], dag.nodes["s6"])
  62. assert len(dag.edges) == 11
  63. assert_edge_exists("s1", "t1", dag)
  64. assert_edge_exists("s2", "t1", dag)
  65. assert_edge_exists("t1", "s3", dag)
  66. assert_edge_exists("t1", "s4", dag)
  67. assert_edge_exists("s3", "t2", dag)
  68. assert_edge_exists("t2", "s5", dag)
  69. assert_edge_exists("s5", "t3", dag)
  70. assert_edge_exists("s4", "t3", dag)
  71. assert_edge_exists("t3", "s6", dag)
  72. assert_edge_exists("s4", "t4", dag)
  73. assert_edge_exists("t4", "s7", dag)
  74. def test_get_dag_2(self):
  75. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  76. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  77. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  78. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  79. data_node_6 = DataNode("quuz", Scope.SCENARIO, "s6")
  80. data_node_7 = DataNode("corge", Scope.SCENARIO, "s7")
  81. task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
  82. task_2 = Task("garply", {}, print, None, [data_node_5], TaskId("t2"))
  83. task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
  84. task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
  85. sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1"))
  86. # 6 | t2 _____
  87. # 5 | \
  88. # 4 | s5 _________________ t3 _______ s6
  89. # 3 | s1 __ _ s4 _____/
  90. # 2 | \ _ t1 ____/ \_ t4 _______ s7
  91. # 1 | /
  92. # 0 | s2 --
  93. # |________________________________________________
  94. # 0 1 2 3 4
  95. dag = sequence._get_dag()
  96. assert dag.length == 5
  97. assert dag.width == 3
  98. assert dag._grid_length == 5
  99. assert dag._grid_width == 7
  100. assert len(dag.nodes) == 10
  101. assert_x_y(0, [0, 3, 6], dag.nodes["s1"], dag.nodes["s2"], dag.nodes["t2"])
  102. assert_x_y(1, [2, 4], dag.nodes["t1"], dag.nodes["s5"])
  103. assert_x_y(2, [3], dag.nodes["s4"])
  104. assert_x_y(3, [2, 4], dag.nodes["t3"], dag.nodes["t4"])
  105. assert_x_y(4, [2, 4], dag.nodes["s6"], dag.nodes["s7"])
  106. assert len(dag.edges) == 9
  107. assert_edge_exists("s1", "t1", dag)
  108. assert_edge_exists("s2", "t1", dag)
  109. assert_edge_exists("t1", "s4", dag)
  110. assert_edge_exists("t2", "s5", dag)
  111. assert_edge_exists("s5", "t3", dag)
  112. assert_edge_exists("s4", "t3", dag)
  113. assert_edge_exists("t3", "s6", dag)
  114. assert_edge_exists("s4", "t4", dag)
  115. assert_edge_exists("t4", "s7", dag)
  116. def test_get_dag_3(self):
  117. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  118. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  119. data_node_3 = DataNode("quuz", Scope.SCENARIO, "s3")
  120. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  121. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  122. data_node_6 = DataNode("corge", Scope.SCENARIO, "s6")
  123. data_node_7 = DataNode("hugh", Scope.SCENARIO, "s7")
  124. task_1 = Task("grault", {}, print, [data_node_1, data_node_2, data_node_3], [data_node_4], TaskId("t1"))
  125. task_2 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t2"))
  126. task_3 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t3"))
  127. task_4 = Task("garply", {}, print, output=[data_node_6], id=TaskId("t4"))
  128. task_5 = Task("bob", {}, print, [data_node_7], None, TaskId("t5"))
  129. sequence = Sequence({}, [task_5, task_3, task_4, task_1, task_2], SequenceId("p1"))
  130. # 12 | s7 __
  131. # 11 | \
  132. # 10 | \
  133. # 9 | t4 _ \_ t5
  134. # 8 | \ ____ t3 ___
  135. # 7 | \ / \
  136. # 6 | s3 _ \__ s6 _ s4 _/ \___ s5
  137. # 5 | \ / \
  138. # 4 | \ / \____ t2
  139. # 3 | s2 ___\__ t1 __/
  140. # 2 | /
  141. # 1 | /
  142. # 0 | s1 _/
  143. # |________________________________________________
  144. # 0 1 2 3 4
  145. dag = sequence._get_dag()
  146. assert dag.length == 5
  147. assert dag.width == 5
  148. assert dag._grid_length == 5
  149. assert dag._grid_width == 13
  150. assert len(dag.nodes) == 12
  151. assert_x_y(
  152. 0, [0, 3, 6, 9, 12], dag.nodes["s1"], dag.nodes["s2"], dag.nodes["s3"], dag.nodes["s7"], dag.nodes["t4"]
  153. )
  154. assert_x_y(1, [3, 6, 9], dag.nodes["t1"], dag.nodes["t5"], dag.nodes["s6"])
  155. assert_x_y(2, [6], dag.nodes["s4"])
  156. assert_x_y(3, [4, 8], dag.nodes["t2"], dag.nodes["t3"])
  157. assert_x_y(4, [6], dag.nodes["s5"])
  158. assert len(dag.edges) == 9
  159. assert_edge_exists("s1", "t1", dag)
  160. assert_edge_exists("s2", "t1", dag)
  161. assert_edge_exists("s3", "t1", dag)
  162. assert_edge_exists("t1", "s4", dag)
  163. assert_edge_exists("s4", "t2", dag)
  164. assert_edge_exists("s4", "t3", dag)
  165. assert_edge_exists("t3", "s5", dag)
  166. assert_edge_exists("t4", "s6", dag)
  167. assert_edge_exists("s7", "t5", dag)
  168. def test_get_dag_4(self):
  169. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  170. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  171. data_node_3 = DataNode("quuz", Scope.SCENARIO, "s3")
  172. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  173. data_node_5 = DataNode("quux", Scope.SCENARIO, "s5")
  174. data_node_6 = DataNode("corge", Scope.SCENARIO, "s6")
  175. task_1 = Task("grault", {}, print, [data_node_1, data_node_2, data_node_3], [data_node_4], TaskId("t1"))
  176. task_2 = Task("waldo", {}, print, [data_node_4], None, id=TaskId("t2"))
  177. task_3 = Task("fred", {}, print, [data_node_4], [data_node_5], TaskId("t3"))
  178. task_4 = Task("garply", {}, print, output=[data_node_6], id=TaskId("t4"))
  179. sequence = Sequence({}, [task_3, task_4, task_1, task_2], SequenceId("p1"))
  180. # 6 | t4 __
  181. # 5 | \
  182. # 4 | s3 _ \__ s6 ______ t3 ___
  183. # 3 | \ ___ s4 _/ \___ s5
  184. # 2 | s2 __\__ t1 __/ \______ t2
  185. # 1 | /
  186. # 0 | s1 _/
  187. # |________________________________________________
  188. # 0 1 2 3 4
  189. dag = sequence._get_dag()
  190. assert dag.length == 5
  191. assert dag.width == 4
  192. assert dag._grid_length == 5
  193. assert dag._grid_width == 7
  194. assert len(dag.nodes) == 10
  195. assert_x_y(0, [0, 2, 4, 6], dag.nodes["s1"], dag.nodes["s2"], dag.nodes["s3"], dag.nodes["t4"])
  196. assert_x_y(1, [2, 4], dag.nodes["t1"], dag.nodes["s6"])
  197. assert_x_y(2, [3], dag.nodes["s4"])
  198. assert_x_y(3, [2, 4], dag.nodes["t2"], dag.nodes["t3"])
  199. assert_x_y(4, [3], dag.nodes["s5"])
  200. assert len(dag.edges) == 8
  201. assert_edge_exists("s1", "t1", dag)
  202. assert_edge_exists("s2", "t1", dag)
  203. assert_edge_exists("s3", "t1", dag)
  204. assert_edge_exists("t1", "s4", dag)
  205. assert_edge_exists("s4", "t2", dag)
  206. assert_edge_exists("s4", "t3", dag)
  207. assert_edge_exists("t3", "s5", dag)
  208. assert_edge_exists("t4", "s6", dag)
  209. def test_get_dag_5(self):
  210. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  211. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  212. task_1 = Task("baz", {}, print, [data_node_1], [data_node_2], TaskId("t1"))
  213. sequence = Sequence({}, [task_1], SequenceId("p1"))
  214. # 1 |
  215. # 0 | s1 __ t1 __ s2
  216. # |_________________
  217. # 0 1 2
  218. dag = sequence._get_dag()
  219. assert dag.length == 3
  220. assert dag.width == 1
  221. assert dag._grid_length == 3
  222. assert dag._grid_width == 1
  223. assert len(dag.nodes) == 3
  224. assert_x_y(0, [0], dag.nodes["s1"])
  225. assert_x_y(1, [0], dag.nodes["t1"])
  226. assert_x_y(2, [0], dag.nodes["s2"])
  227. assert len(dag.edges) == 2
  228. assert_edge_exists("s1", "t1", dag)
  229. assert_edge_exists("t1", "s2", dag)
  230. def test_get_dag_6(self):
  231. data_node_1 = DataNode("foo", Scope.SCENARIO, "s1")
  232. data_node_2 = DataNode("bar", Scope.SCENARIO, "s2")
  233. data_node_3 = DataNode("baz", Scope.SCENARIO, "s3")
  234. data_node_4 = DataNode("qux", Scope.SCENARIO, "s4")
  235. task_1 = Task("quux", {}, print, [data_node_1, data_node_2], [data_node_3], TaskId("t1"))
  236. task_2 = Task("quuz", {}, print, [data_node_2], [data_node_4], TaskId("t2"))
  237. sequence = Sequence({}, [task_1, task_2], SequenceId("p1"))
  238. # 2 |
  239. # |
  240. # 1 | s1 ___ t1 __ s3
  241. # | /
  242. # 0 | s2 /__ t2 __ s4
  243. # |_________________
  244. # 0 1 2
  245. dag = sequence._get_dag()
  246. assert dag.length == 3
  247. assert dag.width == 2
  248. assert dag._grid_length == 3
  249. assert dag._grid_width == 2
  250. assert len(dag.nodes) == 6
  251. assert_x_y(0, [0, 1], dag.nodes["s1"], dag.nodes["s2"])
  252. assert_x_y(1, [0, 1], dag.nodes["t1"], dag.nodes["t2"])
  253. assert_x_y(2, [0, 1], dag.nodes["s3"], dag.nodes["s4"])
  254. assert len(dag.edges) == 5
  255. assert_edge_exists("s1", "t1", dag)
  256. assert_edge_exists("s2", "t1", dag)
  257. assert_edge_exists("t1", "s3", dag)
  258. assert_edge_exists("s2", "t2", dag)
  259. assert_edge_exists("t2", "s4", dag)