# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Template models."""
import copy
import json
import os
import tempfile
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set, Tuple, Union, cast
import jinja2
import yaml
from renku.core import errors
from renku.core.constant import RENKU_HOME
from renku.core.util.os import get_safe_relative_path, hash_file, hash_string
from renku.core.util.util import to_string
if TYPE_CHECKING:
from renku.domain_model.project import Project
TEMPLATE_MANIFEST = "manifest.yaml"
[docs]class TemplatesSource:
"""Base class for Renku template sources."""
def __init__(self, path, source, reference, version, skip_validation: bool = False):
self.path: Path = Path(path)
self.source: str = source
self.reference: Optional[str] = reference
self.version: str = version
self.manifest: TemplatesManifest = TemplatesManifest.from_path(path / TEMPLATE_MANIFEST, skip_validation)
[docs] @classmethod
@abstractmethod
def fetch(cls, source: Optional[str], reference: Optional[str]) -> "TemplatesSource":
"""Fetch an embedded or remote template."""
raise NotImplementedError
@property
def templates(self) -> List["Template"]:
"""Return list of templates."""
for template in self.manifest.templates:
template.templates_source = self
template.validate(skip_files=False)
return self.manifest.templates
[docs] def is_update_available(
self, id: str, reference: Optional[str], version: Optional[str]
) -> Tuple[bool, Optional[str]]:
"""Return True if an update is available along with the latest reference of a template."""
latest = self.get_latest_reference_and_version(id=id, reference=reference, version=version)
if not latest:
return False, reference
latest_reference, latest_version = latest
update_available = latest_reference != reference or latest_version != version
return update_available, latest_reference
[docs] @abstractmethod
def get_all_references(self, id) -> List[str]:
"""Return all available versions for a template id."""
raise NotImplementedError
[docs] @abstractmethod
def get_latest_reference_and_version(
self, id: str, reference: Optional[str], version: Optional[str]
) -> Optional[Tuple[Optional[str], str]]:
"""Return latest reference and version number of a template."""
raise NotImplementedError
[docs] @abstractmethod
def get_template(self, id, reference: Optional[str]) -> "Template":
"""Return a template at a specific reference."""
raise NotImplementedError
[docs]class TemplatesManifest:
"""Manifest file for Renku templates."""
def __init__(self, content: List[Dict], skip_validation: bool = False):
self._content: List[Dict] = content
self._templates: Optional[List[Template]] = None
if not skip_validation:
self.validate()
[docs] @classmethod
def from_path(cls, path: Union[Path, str], skip_validation: bool = False) -> "TemplatesManifest":
"""Extract template metadata from the manifest file."""
try:
return cls.from_string(Path(path).read_text(), skip_validation)
except FileNotFoundError as e:
raise errors.InvalidTemplateError(f"There is no manifest file '{path}'") from e
except UnicodeDecodeError as e:
raise errors.InvalidTemplateError(f"Cannot read manifest file '{path}'") from e
[docs] @classmethod
def from_string(cls, content: str, skip_validation: bool = False) -> "TemplatesManifest":
"""Extract template metadata from the manifest file."""
try:
manifest = yaml.safe_load(content)
except yaml.YAMLError as e:
raise errors.InvalidTemplateError("Cannot parse manifest file") from e
else:
manifest = TemplatesManifest(manifest, skip_validation)
return manifest
@property
def templates(self) -> List["Template"]:
"""Return list of available templates info in the manifest."""
if self._templates is None:
self._templates = [
Template(
id=cast(str, t.get("id") or t.get("folder")),
aliases=t.get("aliases", []),
name=cast(str, t.get("name")),
description=cast(str, t.get("description")),
parameters=cast(Dict[str, Dict[str, Any]], t.get("variables") or t.get("parameters")),
icon=cast(str, t.get("icon")),
ssh_supported=t.get("ssh_supported", False),
immutable_files=t.get("immutable_template_files", []),
allow_update=t.get("allow_template_update", True),
source=None,
reference=None,
version=None,
path=None,
templates_source=None,
)
for t in self._content
]
return self._templates
[docs] def get_raw_content(self) -> List[Dict]:
"""Return raw manifest file content."""
return copy.deepcopy(self._content)
[docs] def validate(self, manifest_only: bool = False) -> List[str]:
"""Validate manifest content."""
warnings = []
if not self._content:
raise errors.InvalidTemplateError("Cannot find any valid template in manifest file")
elif not isinstance(self._content, list):
raise errors.InvalidTemplateError(f"Invalid manifest content type: '{type(self._content).__name__}'")
existing_ids: Set[str] = set()
# NOTE: First check if required fields exists for creating Template instances
for template_entry in self._content:
if not isinstance(template_entry, dict):
raise errors.InvalidTemplateError(f"Invalid template type: '{type(template_entry).__name__}'")
id = template_entry.get("id") or template_entry.get("folder")
if not id:
raise errors.InvalidTemplateError(f"Template doesn't have an id: '{template_entry}'")
if not template_entry.get("id"):
warnings.append(f"Template '{id}' should use 'id' attribute instead of 'folder'.")
# NOTE: Check for duplicate IDs and aliases
aliases = {id}
aliases.update(template_entry.get("aliases", []))
duplicates = existing_ids.intersection(aliases)
if duplicates:
duplicates_str = ", ".join(sorted(f"'{d}'" for d in duplicates))
raise errors.InvalidTemplateError(f"Found duplicate IDs or aliases: {duplicates_str}")
existing_ids.update(aliases)
parameters = template_entry.get("variables")
if parameters:
if not isinstance(parameters, dict):
raise errors.InvalidTemplateError(
f"Invalid template variable type on template '{id}': '{type(parameters).__name__}', "
"should be 'dict'."
)
for key, parameter in parameters.items():
if isinstance(parameter, str): # NOTE: Backwards compatibility
template_entry["variables"][key] = {"description": parameter}
warnings.append(
f"Template '{id}' variable '{key}' uses old string format in manifest and should be"
" replaced with the nested dictionary format."
)
if not manifest_only:
for template in self.templates:
template.validate(skip_files=True)
return warnings
[docs]class Template:
"""Template files and metadata from a template source."""
REQUIRED_ATTRIBUTES = ("name",)
REQUIRED_FILES = (os.path.join(RENKU_HOME, "renku.ini"), "Dockerfile")
PROHIBITED_PATHS = (f"{RENKU_HOME}/*",)
def __init__(
self,
id: str,
name: str,
description: str,
parameters: Dict[str, Dict[str, Any]],
icon: str,
ssh_supported: bool,
aliases: List[str],
immutable_files: List[str],
allow_update: bool,
source: Optional[str],
reference: Optional[str],
version: Optional[str],
path: Optional[Path],
templates_source: Optional[TemplatesSource],
):
self.path: Optional[Path] = path
self.source = source
self.reference = reference
self.version = version
self.id: str = id
self.name: str = name
self.description: str = description
self.icon = icon
self.ssh_supported = ssh_supported
self.aliases: List[str] = aliases
self.immutable_files: List[str] = immutable_files or []
self.allow_update: bool = allow_update
parameters = parameters or {}
self.parameters: List[TemplateParameter] = [
TemplateParameter.from_dict(name=k, value=v) for k, v in parameters.items()
]
self._templates_source: Optional[TemplatesSource] = templates_source
def __repr__(self) -> str:
return f"<Template {self.id}@{self.version}>"
@property
def templates_source(self) -> Optional[TemplatesSource]:
"""Return template's source."""
return self._templates_source
@templates_source.setter
def templates_source(self, templates_source: TemplatesSource):
"""Set templates source for this template."""
self._templates_source = templates_source
self.source = templates_source.source
self.reference = templates_source.reference
self.version = templates_source.version
self.path = templates_source.path / self.id
[docs] def get_all_references(self) -> List[str]:
"""Return all available references for the template."""
if self.templates_source is None:
return []
return self.templates_source.get_all_references(self.id)
[docs] def validate(self, skip_files: bool, raise_errors: bool = True) -> List[str]:
"""Validate a template."""
issues = []
for attribute in self.REQUIRED_ATTRIBUTES:
if not getattr(self, attribute, None):
issue = f"Template '{self.id}' does not have a '{attribute}' attribute"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
for parameter in self.parameters:
issues.extend(parameter.validate(raise_errors=False))
if skip_files:
return issues
if self.path is None or not self.path.exists():
issue = f"Template directory for '{self.id}' does not exists"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
return issues # NOTE: no point checking individual files if directory doesn't exist.
missing_required_files: Set[str] = set()
for file in self.REQUIRED_FILES:
if not (self.path / file).is_file():
missing_required_files.add(file)
if missing_required_files:
required_files_str = "\n\t\t\t".join(missing_required_files)
issue = f"These paths are required but missing:\n\t\t\t{required_files_str}"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
existing_prohibited_paths: Set[str] = set()
for pattern in self.PROHIBITED_PATHS:
matches = {m for m in self.path.glob(pattern) if str(m.relative_to(self.path)) not in self.REQUIRED_FILES}
if matches:
existing_prohibited_paths.update(str(m.relative_to(self.path)) for m in matches)
if existing_prohibited_paths:
prohibited_paths_str = "\n\t\t\t".join(p for p in existing_prohibited_paths)
issue = f"These paths are not allowed in a template:\n\t\t\t{prohibited_paths_str}"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
# NOTE: Validate symlinks resolve to a path inside the template
for relative_path in self.get_files():
try:
get_safe_relative_path(path=relative_path, base=self.path)
except ValueError:
issue = f"File '{relative_path}' is not within the template."
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
return issues
[docs] def get_files(self) -> Generator[str, None, None]:
"""Return all files in a rendered renku template."""
if self.path is None:
return
for subpath in self.path.rglob("*"):
if subpath.is_file():
yield str(subpath.relative_to(self.path))
[docs] def render(self, metadata: "TemplateMetadata") -> "RenderedTemplate":
"""Render template files in a new directory."""
if self.path is None:
raise ValueError("Template path not set")
render_base = Path(tempfile.mkdtemp())
for relative_path in self.get_files():
# NOTE: The path could contain template variables, we need to template it
rendered_relative_path = jinja2.Template(relative_path).render(metadata.metadata)
destination = render_base / rendered_relative_path
destination.parent.mkdir(parents=True, exist_ok=True)
source = self.path / relative_path
try:
content = source.read_text()
except UnicodeDecodeError: # NOTE: Binary files
content_bytes = source.read_bytes()
destination.write_bytes(content_bytes)
else:
template = jinja2.Template(content, keep_trailing_newline=True)
rendered_content = template.render(metadata.metadata)
destination.write_text(rendered_content)
return RenderedTemplate(path=render_base, template=self, metadata=metadata.metadata)
[docs]class RenderedTemplate:
"""A rendered version of a Template."""
def __init__(self, path: Path, template: Template, metadata: Dict[str, Any]):
self.path: Path = path
self.template: Template = template
self.metadata: Dict[str, Any] = metadata
self.checksums: Dict[str, Optional[str]] = {
f: hash_template_file(relative_path=f, absolute_path=self.path / f) for f in self.get_files()
}
[docs] def get_files(self) -> Generator[str, None, None]:
"""Return all files in a rendered renku template."""
for subpath in self.path.rglob("*"):
if not subpath.is_file():
continue
relative_path = str(subpath.relative_to(self.path))
yield relative_path
[docs]class TemplateParameter:
"""Represent template variables."""
VALID_TYPES = ("string", "number", "boolean", "enum")
def __init__(
self,
name: str,
description: Optional[str],
type: Optional[str],
possible_values: Optional[List[Union[int, float, str, bool]]],
default: Optional[Union[int, float, str, bool]],
):
self.name: str = name
self.description: str = description or ""
self.type: Optional[str] = type
self.possible_values: List[Union[int, float, str, bool]] = possible_values or []
self.default = default
[docs] @classmethod
def from_dict(cls, name: str, value: Dict[str, Any]):
"""Create an instance from a dict."""
if not name:
raise errors.InvalidTemplateError(f"No name specified for template parameter '{value}'")
if not isinstance(value, dict):
raise errors.InvalidTemplateError(f"Invalid parameter type '{type(value).__name__}' for '{name}'")
return cls(
name=name,
type=value.get("type"),
description=value.get("description"),
possible_values=value.get("possible_values") or value.get("enum"),
default=value.get("default_value"),
)
@property
def has_default(self) -> bool:
"""Return True if a default value is set."""
# NOTE: ``None`` cannot be used as the default value but it's ok since no variable type accepts it and it's not
# a valid value anyways
return self.default is not None
[docs] def validate(self, raise_errors: bool = True) -> List[str]:
"""Validate manifest content."""
issues = []
if not self.name:
issue = "Template parameter does not have a name."
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
if self.type and self.type not in self.VALID_TYPES:
issue = f"Template contains variable '{self.name}' of type '{self.type}' which is not supported"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
if self.possible_values and not isinstance(self.possible_values, list):
issue = ( # type: ignore[unreachable]
"Invalid type for possible values of template variable " f"'{self.name}': '{self.possible_values}'"
)
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
if self.type and self.type == "enum" and not self.possible_values:
issue = f"Template variable '{self.name}' of type enum does not provide a corresponding enum list"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
if self.has_default:
try:
self.default = self.convert(self.default) # type: ignore
except ValueError as e:
issue = f"Invalid default value for '{self.name}': {e}"
if raise_errors:
raise errors.InvalidTemplateError(issue)
issues.append(issue)
return issues
[docs] def convert(self, value: Union[int, float, str, bool]) -> Union[int, float, str, bool]:
"""Convert a given value to the proper type and raise if value is not valid."""
valid = True
if not self.type:
return value
elif self.type == "string":
if not isinstance(value, str):
valid = False
elif self.type == "number":
try:
value = int(str(value)) # NOTE: Convert to str first to avoid converting float to int if value is float
except ValueError:
try:
value = float(value)
except ValueError:
valid = False
elif self.type == "boolean":
true = (True, 1, "1", "true", "True")
false = (False, 0, "0", "false", "False")
if value not in true and value not in false:
valid = False
else:
value = True if value in true else False
elif self.type == "enum":
if value not in self.possible_values:
valid = False
if not valid:
value = f"{value} (type: {type(value).__name__})"
kind = f"type: {self.type}" if self.type else ""
possible_values = f"possible values: {self.possible_values}" if self.possible_values else ""
separator = ", " if kind and possible_values else ""
info = f" ({kind}{separator}{possible_values})" if kind or possible_values else ""
raise ValueError(f"Invalid value '{value}' for template variable '{self.name}{info}'")
return value
[docs]def find_renku_section(lines: List[str]) -> Tuple[int, int]:
"""Return start and end line numbers of the Renku-specific section."""
start = end = -1
for index, line in enumerate(lines):
if line.startswith("# Renku-specific section - DO NOT MODIFY #"):
start = index
elif line.endswith("# End Renku-specific section #"):
end = index
break
return start, end
[docs]def get_renku_section_from_dockerfile(content: str) -> Optional[str]:
"""Return the Renku-specific section of the Dockerfile or the whole Dockerfile if it doesn't exist."""
lines = [line.rstrip() for line in content.splitlines()]
start, end = find_renku_section(lines)
if 0 <= start < end:
lines = lines[start:end]
lines = [line for line in lines if line] # NOTE: Remove empty lines
return "\n".join(lines)
else:
return None
[docs]def calculate_dockerfile_checksum(
*, dockerfile: Optional[Path] = None, dockerfile_content: Optional[str] = None
) -> str:
"""Calculate checksum for the given file or content.
NOTE: We ignore empty lines and whitespace characters at the end of the lines when calculating Dockerfile checksum
if it has Renku-specific section markers.
"""
if not dockerfile and not dockerfile_content:
raise errors.ParameterError("Either Dockerfile or its content must be passed")
elif dockerfile and dockerfile_content:
raise errors.ParameterError("Cannot pass both Dockerfile and its content")
content = dockerfile_content if dockerfile_content is not None else dockerfile.read_text() # type: ignore
renku_section = get_renku_section_from_dockerfile(content) or content
return hash_string(renku_section)
[docs]def update_dockerfile_content(source: Path, destination: Path) -> None:
"""Update the Renku-specific section of the destination Dockerfile with the one from the source Dockerfile."""
source_lines = [line.rstrip() for line in source.read_text().splitlines()]
source_start, source_end = find_renku_section(source_lines)
destination_lines = [line.rstrip() for line in destination.read_text().splitlines()]
destination_start, destination_end = find_renku_section(destination_lines)
# NOTE: If source or destination Dockerfiles doesn't have Renku-specific section, we overwrite the whole file
if 0 <= source_start < source_end and 0 <= destination_start < destination_end:
destination_lines[destination_start:destination_end] = source_lines[source_start:source_end]
content = "\n".join(destination_lines)
destination.write_text(content)
else:
destination.write_text(source.read_text())
[docs]def hash_template_file(*, relative_path: Union[Path, str], absolute_path: Union[Path, str]) -> Optional[str]:
"""Use proper hash on a template file."""
return (
calculate_dockerfile_checksum(dockerfile=Path(absolute_path))
if str(relative_path) == "Dockerfile"
else hash_file(absolute_path)
)