job_config.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from copy import copy
  12. from typing import Any, Dict, Optional, Union
  13. from taipy.config import Config
  14. from taipy.config._config import _Config
  15. from taipy.config.common._template_handler import _TemplateHandler as _tpl
  16. from taipy.config.unique_section import UniqueSection
  17. from ..exceptions.exceptions import ModeNotAvailable
  18. class JobConfig(UniqueSection):
  19. """
  20. Configuration fields related to the jobs' executions.
  21. Parameters:
  22. mode (str): The Taipy operating mode. By default, the "development" mode is set for testing and debugging the
  23. executions of jobs. A "standalone" mode is also available.
  24. **properties (dict[str, any]): A dictionary of additional properties.
  25. """
  26. name = "JOB"
  27. _MODE_KEY = "mode"
  28. _STANDALONE_MODE = "standalone"
  29. _DEVELOPMENT_MODE = "development"
  30. _DEFAULT_MODE = _DEVELOPMENT_MODE
  31. _DEFAULT_MAX_NB_OF_WORKERS = 2
  32. _MODES = [_STANDALONE_MODE, _DEVELOPMENT_MODE]
  33. def __init__(self, mode: Optional[str] = None, **properties):
  34. self.mode = mode or self._DEFAULT_MODE
  35. self._config = self._create_config(self.mode, **properties)
  36. super().__init__(**properties)
  37. def __copy__(self):
  38. return JobConfig(self.mode, **copy(self._properties))
  39. def __getattr__(self, key: str) -> Optional[Any]:
  40. return self._config.get(key, None)
  41. @classmethod
  42. def default_config(cls):
  43. return JobConfig(cls._DEFAULT_MODE)
  44. def _clean(self):
  45. self.mode = self._DEFAULT_MODE
  46. self._config = self._create_config(self.mode)
  47. def _to_dict(self):
  48. as_dict = {}
  49. if self.mode is not None:
  50. as_dict[self._MODE_KEY] = self.mode
  51. as_dict.update(self._config)
  52. return as_dict
  53. @classmethod
  54. def _from_dict(cls, config_as_dict: Dict[str, Any], id=None, config: Optional[_Config] = None):
  55. mode = config_as_dict.pop(cls._MODE_KEY, None)
  56. return JobConfig(mode, **config_as_dict)
  57. def _update(self, as_dict: Dict[str, Any], default_section=None):
  58. mode = _tpl._replace_templates(as_dict.pop(self._MODE_KEY, self.mode))
  59. if self.mode != mode:
  60. self.mode = mode
  61. self._config = self._create_config(self.mode, **as_dict)
  62. if self._config is not None:
  63. self._update_config(as_dict)
  64. @staticmethod
  65. def _configure(
  66. mode: Optional[str] = None, max_nb_of_workers: Optional[Union[int, str]] = None, **properties
  67. ) -> "JobConfig":
  68. """Configure job execution.
  69. Parameters:
  70. mode (Optional[str]): The job execution mode.
  71. Possible values are: *"standalone"* (the default value) or *"development"*.
  72. max_nb_of_workers (Optional[int, str]): Parameter used only in default *"standalone"* mode.
  73. This indicates the maximum number of jobs able to run in parallel.<br/>
  74. The default value is 2.<br/>
  75. A string can be provided to dynamically set the value using an environment
  76. variable. The string must follow the pattern: `ENV[&lt;env_var&gt;]` where
  77. `&lt;env_var&gt;` is the name of an environment variable.
  78. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  79. Returns:
  80. The new job execution configuration.
  81. """
  82. section = JobConfig(mode, max_nb_of_workers=max_nb_of_workers, **properties)
  83. Config._register(section)
  84. return Config.unique_sections[JobConfig.name]
  85. def _update_config(self, config_as_dict: Dict[str, Any]):
  86. for k, v in config_as_dict.items():
  87. type_to_convert = type(self.get_default_config(self.mode).get(k, None)) or str
  88. value = _tpl._replace_templates(v, type_to_convert)
  89. if value is not None:
  90. self._config[k] = value
  91. @property
  92. def is_standalone(self) -> bool:
  93. """True if the config is set to standalone mode"""
  94. return self.mode == self._STANDALONE_MODE
  95. @property
  96. def is_development(self) -> bool:
  97. """True if the config is set to development mode"""
  98. return self.mode == self._DEVELOPMENT_MODE
  99. def get_default_config(self, mode: str) -> Dict[str, Any]:
  100. if self.is_standalone:
  101. return {"max_nb_of_workers": self._DEFAULT_MAX_NB_OF_WORKERS}
  102. if self.is_development:
  103. return {}
  104. raise ModeNotAvailable(mode)
  105. def _create_config(self, mode, **properties):
  106. return {**self.get_default_config(mode), **properties}