test_context_is_readable.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. from unittest.mock import Mock, patch
  2. import pytest
  3. from src.taipy.gui_core._context import _GuiCoreContext
  4. from taipy.config.common.scope import Scope
  5. from taipy.core import Job, Scenario, Task
  6. from taipy.core.data.pickle import PickleDataNode
  7. from taipy.gui import Gui
  8. a_scenario = Scenario("scenario_config_id", [], {}, sequences={"sequence": {}})
  9. a_task = Task("task_config_id", {}, print)
  10. a_job = Job("JOB_job_id", a_task, "submit_id", a_scenario.id)
  11. a_job.isfinished = lambda s: True
  12. a_datanode = PickleDataNode("data_node_config_id", Scope.SCENARIO)
  13. def mock_is_readable_false(entity_id):
  14. return False
  15. def mock_is_true(entity_id):
  16. return True
  17. def mock_core_get(entity_id):
  18. if entity_id == a_scenario.id:
  19. return a_scenario
  20. if entity_id == a_job.id:
  21. return a_job
  22. if entity_id == a_datanode.id:
  23. return a_datanode
  24. return a_task
  25. class MockState:
  26. def __init__(self, **kwargs) -> None:
  27. self.assign = kwargs.get("assign")
  28. class TestGuiCoreContext_is_readable:
  29. def test_scenario_adapter(self):
  30. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  31. gui_core_context = _GuiCoreContext(Mock())
  32. outcome = gui_core_context.scenario_adapter(a_scenario)
  33. assert isinstance(outcome, tuple)
  34. assert outcome[0] == a_scenario.id
  35. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  36. outcome = gui_core_context.scenario_adapter(a_scenario)
  37. assert outcome is None
  38. def test_get_scenario_by_id(self):
  39. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  40. gui_core_context = _GuiCoreContext(Mock())
  41. outcome = gui_core_context.get_scenario_by_id(a_scenario.id)
  42. assert outcome is not None
  43. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  44. outcome = gui_core_context.get_scenario_by_id(a_scenario.id)
  45. assert outcome is None
  46. def test_crud_scenario(self):
  47. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  48. gui_core_context = _GuiCoreContext(Mock())
  49. assign = Mock()
  50. gui_core_context.crud_scenario(
  51. MockState(assign=assign),
  52. "",
  53. {
  54. "args": [
  55. True,
  56. False,
  57. {"name": "name", "id": a_scenario.id},
  58. ]
  59. },
  60. )
  61. assign.assert_not_called()
  62. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  63. assign.reset_mock()
  64. gui_core_context.crud_scenario(
  65. MockState(assign=assign),
  66. "",
  67. {
  68. "args": [
  69. True,
  70. False,
  71. {"name": "name", "id": a_scenario.id},
  72. ]
  73. },
  74. )
  75. assign.assert_called_once()
  76. assert assign.call_args.args[0] == "gui_core_sc_error"
  77. assert str(assign.call_args.args[1]).endswith("is not readable.")
  78. def test_edit_entity(self):
  79. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  80. gui_core_context = _GuiCoreContext(Mock())
  81. assign = Mock()
  82. gui_core_context.edit_entity(
  83. MockState(assign=assign),
  84. "",
  85. {
  86. "args": [
  87. {"name": "name", "id": a_scenario.id},
  88. ]
  89. },
  90. )
  91. assign.assert_called_once()
  92. assert assign.call_args.args[0] == "gui_core_sv_error"
  93. assert assign.call_args.args[1] == ""
  94. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  95. assign.reset_mock()
  96. gui_core_context.edit_entity(
  97. MockState(assign=assign),
  98. "",
  99. {
  100. "args": [
  101. {"name": "name", "id": a_scenario.id},
  102. ]
  103. },
  104. )
  105. assign.assert_called_once()
  106. assert assign.call_args.args[0] == "gui_core_sv_error"
  107. assert str(assign.call_args.args[1]).endswith("is not readable.")
  108. def test_scenario_status_callback(self):
  109. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get) as mockget:
  110. mockget.reset_mock()
  111. gui_core_context = _GuiCoreContext(Mock())
  112. gui_core_context.scenario_status_callback(a_job.id)
  113. mockget.assert_called()
  114. found = False
  115. for call in mockget.call_args_list:
  116. if call.args[0] == a_job.id:
  117. found = True
  118. break
  119. assert found is True
  120. mockget.reset_mock()
  121. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  122. gui_core_context.scenario_status_callback(a_job.id)
  123. mockget.assert_not_called()
  124. def test_data_node_adapter(self):
  125. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  126. gui_core_context = _GuiCoreContext(Mock())
  127. outcome = gui_core_context.data_node_adapter(a_datanode)
  128. assert isinstance(outcome, tuple)
  129. assert outcome[0] == a_datanode.id
  130. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  131. outcome = gui_core_context.data_node_adapter(a_datanode)
  132. assert outcome is None
  133. def test_job_adapter(self):
  134. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  135. gui_core_context = _GuiCoreContext(Mock())
  136. outcome = gui_core_context.job_adapter(a_job)
  137. assert isinstance(outcome, tuple)
  138. assert outcome[0] == a_job.id
  139. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  140. outcome = gui_core_context.job_adapter(a_job)
  141. assert outcome is None
  142. def test_act_on_jobs(self):
  143. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get), patch(
  144. "src.taipy.gui_core._context.is_deletable", side_effect=mock_is_true
  145. ):
  146. gui_core_context = _GuiCoreContext(Mock())
  147. assign = Mock()
  148. gui_core_context.act_on_jobs(
  149. MockState(assign=assign),
  150. "",
  151. {
  152. "args": [
  153. {"id": [a_job.id], "action": "delete"},
  154. ]
  155. },
  156. )
  157. assign.assert_called_once()
  158. assert assign.call_args.args[0] == "gui_core_js_error"
  159. assert str(assign.call_args.args[1]).find("is not readable.") == -1
  160. assign.reset_mock()
  161. gui_core_context.act_on_jobs(
  162. MockState(assign=assign),
  163. "",
  164. {
  165. "args": [
  166. {"id": [a_job.id], "action": "cancel"},
  167. ]
  168. },
  169. )
  170. assign.assert_called_once()
  171. assert assign.call_args.args[0] == "gui_core_js_error"
  172. assert str(assign.call_args.args[1]).find("is not readable.") == -1
  173. assign.reset_mock()
  174. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  175. gui_core_context.act_on_jobs(
  176. MockState(assign=assign),
  177. "",
  178. {
  179. "args": [
  180. {"id": [a_job.id], "action": "delete"},
  181. ]
  182. },
  183. )
  184. assign.assert_called_once()
  185. assert assign.call_args.args[0] == "gui_core_js_error"
  186. assert str(assign.call_args.args[1]).endswith("is not readable.")
  187. assign.reset_mock()
  188. gui_core_context.act_on_jobs(
  189. MockState(assign=assign),
  190. "",
  191. {
  192. "args": [
  193. {"id": [a_job.id], "action": "cancel"},
  194. ]
  195. },
  196. )
  197. assign.assert_called_once()
  198. assert assign.call_args.args[0] == "gui_core_js_error"
  199. assert str(assign.call_args.args[1]).endswith("is not readable.")
  200. def test_edit_data_node(self):
  201. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  202. gui_core_context = _GuiCoreContext(Mock())
  203. assign = Mock()
  204. gui_core_context.edit_data_node(
  205. MockState(assign=assign),
  206. "",
  207. {
  208. "args": [
  209. {"id": a_datanode.id},
  210. ]
  211. },
  212. )
  213. assign.assert_called_once()
  214. assert assign.call_args.args[0] == "gui_core_dv_error"
  215. assert assign.call_args.args[1] == ""
  216. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  217. assign.reset_mock()
  218. gui_core_context.edit_data_node(
  219. MockState(assign=assign),
  220. "",
  221. {
  222. "args": [
  223. {"id": a_datanode.id},
  224. ]
  225. },
  226. )
  227. assign.assert_called_once()
  228. assert assign.call_args.args[0] == "gui_core_dv_error"
  229. assert str(assign.call_args.args[1]).endswith("is not readable.")
  230. def test_lock_datanode_for_edit(self):
  231. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  232. mockGui = Mock(Gui)
  233. mockGui._get_client_id = lambda: "a_client_id"
  234. gui_core_context = _GuiCoreContext(mockGui)
  235. assign = Mock()
  236. gui_core_context.lock_datanode_for_edit(
  237. MockState(assign=assign),
  238. "",
  239. {
  240. "args": [
  241. {"id": a_datanode.id},
  242. ]
  243. },
  244. )
  245. assign.assert_called_once()
  246. assert assign.call_args.args[0] == "gui_core_dv_error"
  247. assert assign.call_args.args[1] == ""
  248. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  249. assign.reset_mock()
  250. gui_core_context.lock_datanode_for_edit(
  251. MockState(assign=assign),
  252. "",
  253. {
  254. "args": [
  255. {"id": a_datanode.id},
  256. ]
  257. },
  258. )
  259. assign.assert_called_once()
  260. assert assign.call_args.args[0] == "gui_core_dv_error"
  261. assert str(assign.call_args.args[1]).endswith("is not readable.")
  262. def test_get_scenarios_for_owner(self):
  263. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get) as mockget:
  264. gui_core_context = _GuiCoreContext(Mock())
  265. gui_core_context.get_scenarios_for_owner(a_scenario.id)
  266. mockget.assert_called_once()
  267. mockget.reset_mock()
  268. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  269. gui_core_context.scenario_status_callback(a_scenario.id)
  270. mockget.assert_not_called()
  271. def test_update_data(self):
  272. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  273. mockGui = Mock(Gui)
  274. mockGui._get_client_id = lambda: "a_client_id"
  275. gui_core_context = _GuiCoreContext(mockGui)
  276. assign = Mock()
  277. gui_core_context.update_data(
  278. MockState(assign=assign),
  279. "",
  280. {
  281. "args": [
  282. {"id": a_datanode.id},
  283. ]
  284. },
  285. )
  286. assign.assert_called()
  287. assert assign.call_args_list[0].args[0] == "gui_core_dv_error"
  288. assert assign.call_args_list[0].args[1] == ""
  289. assign.reset_mock()
  290. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  291. gui_core_context.update_data(
  292. MockState(assign=assign),
  293. "",
  294. {
  295. "args": [
  296. {"id": a_datanode.id},
  297. ]
  298. },
  299. )
  300. assign.assert_called_once()
  301. assert assign.call_args.args[0] == "gui_core_dv_error"
  302. assert str(assign.call_args.args[1]).endswith("is not readable.")
  303. def test_tabular_data_edit(self):
  304. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
  305. mockGui = Mock(Gui)
  306. mockGui._get_client_id = lambda: "a_client_id"
  307. gui_core_context = _GuiCoreContext(mockGui)
  308. assign = Mock()
  309. gui_core_context.tabular_data_edit(
  310. MockState(assign=assign),
  311. "",
  312. {
  313. "user_data": {"dn_id": a_datanode.id},
  314. },
  315. )
  316. assign.assert_called_once()
  317. assert assign.call_args_list[0].args[0] == "gui_core_dv_error"
  318. assert (
  319. assign.call_args_list[0].args[1]
  320. == "Error updating Datanode tabular value: type does not support at[] indexer."
  321. )
  322. assign.reset_mock()
  323. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  324. gui_core_context.tabular_data_edit(
  325. MockState(assign=assign),
  326. "",
  327. {
  328. "user_data": {"dn_id": a_datanode.id},
  329. },
  330. )
  331. assign.assert_called_once()
  332. assert assign.call_args.args[0] == "gui_core_dv_error"
  333. assert str(assign.call_args.args[1]).endswith("is not readable.")
  334. def test_get_data_node_tabular_data(self):
  335. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get) as mockget:
  336. gui_core_context = _GuiCoreContext(Mock())
  337. gui_core_context.get_data_node_tabular_data(a_datanode, a_datanode.id)
  338. mockget.assert_called_once()
  339. mockget.reset_mock()
  340. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  341. gui_core_context.get_data_node_tabular_data(a_datanode, a_datanode.id)
  342. mockget.assert_not_called()
  343. def test_get_data_node_tabular_columns(self):
  344. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get) as mockget:
  345. gui_core_context = _GuiCoreContext(Mock())
  346. gui_core_context.get_data_node_tabular_columns(a_datanode, a_datanode.id)
  347. mockget.assert_called_once()
  348. mockget.reset_mock()
  349. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  350. gui_core_context.get_data_node_tabular_columns(a_datanode, a_datanode.id)
  351. mockget.assert_not_called()
  352. def test_get_data_node_chart_config(self):
  353. with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get) as mockget:
  354. gui_core_context = _GuiCoreContext(Mock())
  355. gui_core_context.get_data_node_chart_config(a_datanode, a_datanode.id)
  356. mockget.assert_called_once()
  357. mockget.reset_mock()
  358. with patch("src.taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
  359. gui_core_context.get_data_node_chart_config(a_datanode, a_datanode.id)
  360. mockget.assert_not_called()