# Copyright 2021-2024 Avaiga Private Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. # -------------------------------------------------------------------------------------------------- # Common artifacts used by the other scripts located in this directory. # -------------------------------------------------------------------------------------------------- import os import re import subprocess import typing as t from dataclasses import asdict, dataclass import requests # These are the base name of the sub packages taipy-* # They also are the names of the directory where their code belongs, under the 'taipy' directory # in the root of the Taipy repository. PACKAGES = ["common", "core", "gui", "rest", "templates"] # -------------------------------------------------------------------------------------------------- @dataclass(order=True) class Version: """Helps manipulate version numbers.""" major: int minor: int patch: int = 0 ext: t.Optional[str] = None # Unknown version constant UNKNOWN: t.ClassVar["Version"] @property def name(self) -> str: """Returns a string representation of this Version without the extension part.""" return f"{self.major}.{self.minor}.{self.patch}" @property def full_name(self) -> str: """Returns a full string representation of this Version.""" return f"{self.name}.{self.ext}" if self.ext else self.name def __str__(self) -> str: """Returns a string representation of this version.""" return self.full_name def __repr__(self) -> str: """Returns a full string representation of this version.""" ext = f".{self.ext}" if self.ext else "" return f"Version({self.major}.{self.minor}.{self.patch}{ext})" @classmethod def from_string(cls, version: str): """Creates a Version from a string. Parameters: version: a version name as a string.
The format should be ".[.[.]] where - must be a number, indicating the major number of the version - must be a number, indicating the minor number of the version - must be a number, indicating the patch level of the version. Optional. - must be a string. It is common practice that ends with a number, but it is not required. Optional. Returns: A new Version object with the appropriate values that were parsed. """ match = re.fullmatch(r"(\d+)\.(\d+)(?:\.(\d+))?(?:\.([^\s]+))?", version) if match: major = int(match[1]) minor = int(match[2]) patch = int(match[3]) if match[3] else 0 ext = match[4] return cls(major=major, minor=minor, patch=patch, ext=ext) else: raise ValueError(f"String not in expected format: {version}") def to_dict(self) -> dict[str, str]: """Returns this Version as a dictionary.""" return {k: v for k, v in asdict(self).items() if v is not None} def bump_ext_version(self) -> "Version": """Returns a new Version object where the extension part version was incremented. If this Version has no extension part, this method returns *self*. """ if not self.ext or (m := re.search(r"([0-9]+)$", self.ext)) is None: return self ext_ver = int(m[1]) + 1 return Version(self.major, self.minor, self.patch, f"{self.ext[: m.start(1)]}{ext_ver}") def validate_extension(self, ext="dev"): """Returns True if the extension part of this Version is the one queried.""" return self._split_ext()[0] == ext def _split_ext(self) -> t.Tuple[str, int]: """Splits extension into the (identifier, index) tuple Returns: ("", -1) if there is no extension. (extension, -1) if there is no extension index. (extension, index) if there is an extension index (e.g. "dev3"). """ if not self.ext or (match := re.fullmatch(r"(.*?)(\d+)?", self.ext)) is None: return ("", -1) # No extension # Potentially no index return (match[1], int(match[2]) if match[2] else -1) def is_compatible(self, version: "Version") -> bool: """Checks if this version is compatible with another. Version v1 is defined as being compatible with version v2 if a package built with version v1 can safely depend on another package built with version v2.
Here are the conditions set when checking whether v1 is compatible with v2: - If v1 and v2 have different major or minor numbers, they are not compatible. - If v1 has no extension, it is compatible only with v2 that have no extension. - If v1 has an extension, it is compatible with any v2 that has the same extension, no matter the extension index. I.e.: package-1.[m].[t] is NOT compatible with any sub-package-[M].* where M != 1 package-1.2.[t] is NOT compatible with any sub-package-1.[m].* where m != 2 package-1.2.[t] is compatible with all sub-package-1.2.* package-1.2.[t].ext[X] is compatible with all sub-package-1.2.*.ext* package-1.2.3 is NOT compatible with any sub-package-1.2.*.* package-1.2.3.extA is NOT compatible with any sub-package-1.2.*.extB if extA != extB, independently of a potential extension index. Parameters: version: the version to check compatibility against. Returns: True is this Version is compatible with *version* and False if it is not. """ if self.major != version.major or self.minor != version.minor: return False if self.patch > version.patch: return True # No extensions on either → Compatible if not self.ext and not version.ext: return True # self has extension, version doesn't → Compatible if self.ext and not version.ext: return True # Version has extension, self doesn't → Not compatible if not self.ext and version.ext: return False # Both have extensions → check identifiers. Dissimilar identifiers → Not compatible self_prefix, _ = self._split_ext() other_prefix, _ = version._split_ext() if self_prefix != other_prefix: return False # Same identifiers → Compatible return True Version.UNKNOWN = Version(0, 0) # -------------------------------------------------------------------------------------------------- class Package: """Information on any Taipy package and sub-package.""" def __init__(self, package: str) -> None: self._name = package if package == "taipy": self._short = package else: if package.startswith("taipy-"): self._short = package[6:] else: self._name = f"taipy-{package}" self._short = package if self._short not in PACKAGES: raise ValueError(f"Invalid package name {package}.") @property def name(self) -> str: """The full package name.""" return self._name @property def short_name(self) -> str: """The short package name.""" return self._short @property def package_dir(self) -> str: return "taipy" if self._name == "taipy" else os.path.join("taipy", self._short) def __str__(self) -> str: """Returns a string representation of this package.""" return self.name def __repr__(self) -> str: """Returns a full string representation of this package.""" return f"Package({self.name})" def __eq__(self, other): return isinstance(other, Package) and (self._short, self._short) == (other._short, other._short) def __hash__(self): return hash(self._short) # -------------------------------------------------------------------------------------------------- def retrieve_github_path() -> t.Optional[str]: # Retrieve current Git branch remote URL def run(*args) -> str: return subprocess.run(args, stdout=subprocess.PIPE, text=True, check=True).stdout.strip() branch_name = run("git", "branch", "--show-current") remote_name = run("git", "config", f"branch.{branch_name}.remote") url = run("git", "remote", "get-url", remote_name) if match := re.fullmatch(r"git@github.com:(.*)\.git", url): return match[1] if match := re.fullmatch(r"https://github.com/(.*)$", url): return match[1] print("ERROR - Could not retrieve GibHub branch path") # noqa: T201 return None # -------------------------------------------------------------------------------------------------- def fetch_github_releases(gh_path: t.Optional[str] = None) -> dict[Package, list[Version]]: # Retrieve all available releases (potentially paginating results) for all packages. # Returns a dictionary of package_short_name-Value pairs. # Note for reviewers: using a Package as the dictionary is is cumbersome in the rest of the # code. all_releases: dict[str, list[Version]] = {} if gh_path is None: gh_path = retrieve_github_path() if gh_path is None: raise ValueError("Couldn't figure out GitHub branch path.") url = f"https://api.github.com/repos/{gh_path}/releases" page = 1 # Read all release versions and store them in a package_name - list[Version] dictionary while url: response = requests.get(url, params={"per_page": 50, "page": page}) response.raise_for_status() # Raise error for bad responses for release in response.json(): tag_name = release["tag_name"] pkg_ver, pkg = tag_name.split("-") if "-" in tag_name else (tag_name, "taipy") # Drop legacy packages (config...) if pkg != "taipy" and pkg not in PACKAGES: continue # Exception for legacy version: v1.0.0 -> 1.0.0 if pkg_ver == "v1.0.0": pkg_ver = pkg_ver[1:] version = Version.from_string(pkg_ver) if versions := all_releases.get(pkg): versions.append(version) else: all_releases[pkg] = [version] # Check for pagination in the `Link` header link_header = response.headers.get("Link", "") if 'rel="next"' in link_header: url = link_header.split(";")[0].strip("<>") # Extract next page URL page += 1 else: url = None # No more pages # Build and return the dictionary using Package instances return {Package(p): v for p, v in all_releases.items()} # -------------------------------------------------------------------------------------------------- def fetch_latest_github_taipy_releases( all_releases: t.Optional[dict[Package, list[Version]]] = None, gh_path: t.Optional[str] = None ) -> Version: # Retrieve all available releases if necessary if all_releases is None: all_releases = fetch_github_releases(gh_path) # Find the latest 'taipy' version that has no extension latest_taipy_version = Version.UNKNOWN if versions := all_releases.get(Package("taipy")): latest_taipy_version = max(filter(lambda v: v.ext is None, versions)) return latest_taipy_version