Source code for renku.domain_model.workflow.parameter

# -*- coding: utf-8 -*-
#
# Copyright 2018-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes to represent inputs/outputs/parameters in a Plan."""

import copy
import urllib
from abc import abstractmethod
from pathlib import Path, PurePosixPath
from typing import Any, Iterator, List, Optional
from uuid import uuid4

from renku.core.errors import ParameterError
from renku.core.util.urls import get_slug

RANDOM_ID_LENGTH = 4
DIRECTORY_MIME_TYPE = "inode/directory"


def _validate_mime_type(encoding_format: List[str]):
    """Validates MIME-types."""
    if encoding_format and DIRECTORY_MIME_TYPE in encoding_format and len(encoding_format) > 1:
        raise ParameterError(f"Directory MIME-type '{DIRECTORY_MIME_TYPE}'.")


[docs]class MappedIOStream: """Represents an IO stream (stdin, stdout, stderr).""" STREAMS = ["stdin", "stdout", "stderr"] def __init__(self, *, id: str = None, stream_type: str): assert stream_type in MappedIOStream.STREAMS self.id: str = id or MappedIOStream.generate_id(stream_type) self.stream_type = stream_type
[docs] @staticmethod def generate_id(stream_type: str) -> str: """Generate an id for parameters.""" return f"/iostreams/{stream_type}"
[docs]class CommandParameterBase: """Represents a parameter for a Plan.""" def __init__( self, *, default_value: Any, description: Optional[str], id: str, name: Optional[str], position: Optional[int] = None, prefix: Optional[str] = None, derived_from: Optional[str] = None, postfix: Optional[str] = None, ): self.default_value: Any = default_value self.description: Optional[str] = description self.id: str = id self.position: Optional[int] = position self.prefix: Optional[str] = prefix self._v_actual_value = None self._v_actual_value_set: bool = False self.derived_from: Optional[str] = derived_from self.postfix: Optional[str] = postfix if name is not None: self.name: str = name else: self.name = self._get_default_name() @staticmethod def _generate_id(plan_id: str, parameter_type: str, position: Optional[int], postfix: Optional[str] = None) -> str: """Generate an id for parameters.""" # /plans/723fd784-9347-4081-84de-a6dbb067545b/inputs/1 # /plans/723fd784-9347-4081-84de-a6dbb067545b/inputs/stdin # /plans/723fd784-9347-4081-84de-a6dbb067545b/inputs/dda5fcbf00984917be46dc12f5f7b675 position_str = str(position) if position is not None else uuid4().hex postfix = urllib.parse.quote(postfix) if postfix else position_str return f"{plan_id}/{parameter_type}/{postfix}" def __repr__(self): return f"<{self.__class__.__name__} '{self.actual_value}'>" @property def role(self) -> str: """Return a unique role for this parameter within its Plan.""" assert self.id, "Id is not set" return PurePosixPath(self.id).name
[docs] def to_argv(self) -> List[Any]: """String representation (sames as cmd argument).""" value = self.actual_value if isinstance(value, str) and " " in value: value = f'"{value}"' if self.prefix: if self.prefix.endswith(" "): return [self.prefix[:-1], str(value)] return [f"{self.prefix}{value}"] return [str(value)]
@property def actual_value(self): """Get the actual value to be used for execution.""" if getattr(self, "_v_actual_value_set", False): return self._v_actual_value return self.default_value @actual_value.setter def actual_value(self, value): """Set the actual value to be used for execution.""" self._v_actual_value = value self._v_actual_value_set = True @property def actual_value_set(self): """Whether the actual_value on this parameter has been set at least once.""" return getattr(self, "_v_actual_value_set", False) def _generate_name(self, base) -> str: name = get_slug(self.prefix.strip(" -="), invalid_chars=["."]) if self.prefix else base position = self.position or uuid4().hex[:RANDOM_ID_LENGTH] return f"{name}-{position}" def _get_default_name(self) -> str: raise NotImplementedError
[docs] @abstractmethod def derive(self, plan_id: str) -> "CommandParameterBase": """Create a new command parameter from self.""" raise NotImplementedError
[docs]class CommandParameter(CommandParameterBase): """An argument to a command that is neither input nor output.""" def __init__( self, *, default_value: Any = None, description: str = None, id: str, name: str = None, position: Optional[int] = None, prefix: str = None, derived_from: str = None, postfix: str = None, ): super().__init__( default_value=default_value, description=description, id=id, name=name, position=position, prefix=prefix, derived_from=derived_from, postfix=postfix, )
[docs] @staticmethod def generate_id(plan_id: str, position: Optional[int] = None, postfix: str = None) -> str: """Generate an id for CommandParameter.""" return CommandParameterBase._generate_id( plan_id, parameter_type="parameters", position=position, postfix=postfix )
def __repr__(self): return ( f"<Parameter '{self.name}': {self.actual_value} (default: {self.default_value}, prefix: {self.prefix}, " f"position: {self.position})>" ) def _get_default_name(self) -> str: return self._generate_name(base="parameter")
[docs] def derive(self, plan_id: str) -> "CommandParameter": """Create a new ``CommandParameter`` that is derived from self.""" parameter = copy.copy(self) parameter.id = CommandParameter.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix) return parameter
[docs]class CommandInput(CommandParameterBase): """An input to a command.""" def __init__( self, *, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, mapped_to: Optional[MappedIOStream] = None, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None, encoding_format: Optional[List[str]] = None, derived_from: Optional[str] = None, postfix: Optional[str] = None, ): assert isinstance(default_value, (Path, str)), f"Invalid value type for CommandOutput: {type(default_value)}" super().__init__( default_value=str(default_value), description=description, id=id, name=name, position=position, prefix=prefix, derived_from=derived_from, postfix=postfix, ) self.mapped_to: Optional[MappedIOStream] = mapped_to if encoding_format is not None: _validate_mime_type(encoding_format) self.encoding_format: Optional[List[str]] = encoding_format
[docs] @staticmethod def generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None) -> str: """Generate an id for CommandInput.""" return CommandParameterBase._generate_id(plan_id, parameter_type="inputs", position=position, postfix=postfix)
[docs] def to_stream_representation(self) -> str: """Input stream representation.""" return f"< {self.default_value}" if self.mapped_to else ""
def _get_default_name(self) -> str: return self._generate_name(base="input")
[docs] def derive(self, plan_id: str) -> "CommandInput": """Create a new ``CommandInput`` that is derived from self.""" parameter = copy.copy(self) parameter.id = CommandInput.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix) return parameter
[docs]class CommandOutput(CommandParameterBase): """An output from a command.""" def __init__( self, *, create_folder: bool = False, default_value: Any = None, description: Optional[str] = None, id: str, mapped_to: Optional[MappedIOStream] = None, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None, encoding_format: Optional[List[str]] = None, derived_from: Optional[str] = None, postfix: Optional[str] = None, ): assert isinstance(default_value, (Path, str)), f"Invalid value type for CommandOutput: {type(default_value)}" super().__init__( default_value=str(default_value), description=description, id=id, name=name, position=position, prefix=prefix, derived_from=derived_from, postfix=postfix, ) self.create_folder: bool = create_folder self.mapped_to: Optional[MappedIOStream] = mapped_to if encoding_format is not None: _validate_mime_type(encoding_format) self.encoding_format: Optional[List[str]] = encoding_format
[docs] @staticmethod def generate_id(plan_id: str, position: Optional[int] = None, postfix: str = None) -> str: """Generate an id for CommandOutput.""" return CommandParameterBase._generate_id(plan_id, parameter_type="outputs", position=position, postfix=postfix)
[docs] def to_stream_representation(self) -> str: """Input stream representation.""" if not self.mapped_to: return "" return f"> {self.default_value}" if self.mapped_to.stream_type == "stdout" else f" 2> {self.default_value}"
def _get_default_name(self) -> str: return self._generate_name(base="output")
[docs] def derive(self, plan_id: str) -> "CommandOutput": """Create a new ``CommandOutput`` that is derived from self.""" parameter = copy.copy(self) parameter.id = CommandOutput.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix) return parameter
[docs]class ParameterMapping(CommandParameterBase): """A mapping of child parameter(s) to a parent CompositePlan.""" def __init__( self, *, default_value: Any, description: Optional[str] = None, id: str, name: Optional[str] = None, mapped_parameters: List[CommandParameterBase], **kwargs, ): super().__init__(default_value=default_value, description=description, id=id, name=name, **kwargs) self.mapped_parameters: List[CommandParameterBase] = mapped_parameters
[docs] @staticmethod def generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None) -> str: """Generate an id for CommandOutput.""" return CommandParameterBase._generate_id(plan_id, parameter_type="mappings", position=position, postfix=postfix)
[docs] def to_stream_representation(self) -> str: """Input stream representation.""" return ""
def _get_default_name(self) -> str: return self._generate_name(base="mapping") @CommandParameterBase.actual_value.setter # type: ignore def actual_value(self, value): """Set the actual value to be used for execution.""" self._v_actual_value = value self._v_actual_value_set = True for mapped_to in self.mapped_parameters: if not mapped_to.actual_value_set: mapped_to.actual_value = value @property def leaf_parameters(self) -> Iterator[CommandParameterBase]: """Return leaf (non-Mapping) parameters contained by this Mapping.""" for mapped_to in self.mapped_parameters: if isinstance(mapped_to, ParameterMapping): yield from mapped_to.leaf_parameters else: yield mapped_to
[docs] def derive(self, plan_id: str) -> "ParameterMapping": """Create a new ``CommandParameter`` that is derived from self.""" parameter = copy.copy(self) parameter.id = ParameterMapping.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix) return parameter