Source code for renku.core.dataset.providers.api

# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""API for providers."""

import abc
from collections import UserDict
from enum import IntEnum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union

from renku.core import errors
from renku.core.plugin import hookimpl
from renku.core.util.metadata import get_canonical_key, read_credentials, store_credentials
from renku.core.util.util import NO_VALUE, NoValueType
from renku.domain_model.dataset_provider import IDatasetProviderPlugin

if TYPE_CHECKING:
    from renku.core.dataset.providers.models import (
        DatasetAddMetadata,
        ProviderDataset,
        ProviderDatasetFile,
        ProviderParameter,
    )
    from renku.domain_model.dataset import Dataset, DatasetTag


[docs]class ProviderPriority(IntEnum): """Defines the order in which a provider is checked to see if it supports a URI. Providers that support more specific URIs should have a higher priority so that they are checked first. """ HIGHEST = 1 HIGHER = 2 HIGH = 3 NORMAL = 4 LOW = 5 LOWER = 6 LOWEST = 7
[docs]class ProviderApi(IDatasetProviderPlugin): """Interface defining provider methods.""" priority: Optional[ProviderPriority] = None name: Optional[str] = None def __init__(self, uri: Optional[str], **kwargs): self._uri: str = uri or "" def __init_subclass__(cls, **kwargs): for required_property in ("priority", "name"): if getattr(cls, required_property, None) is None: raise NotImplementedError(f"{required_property} must be set for {cls}") def __repr__(self): return f"<DatasetProvider {self.name}>"
[docs] @classmethod @hookimpl def dataset_provider(cls) -> "Type[ProviderApi]": """The definition of the provider.""" return cls
[docs] @staticmethod @abc.abstractmethod def supports(uri: str) -> bool: """Whether or not this provider supports a given URI.""" raise NotImplementedError
[docs] @staticmethod def supports_add() -> bool: """Whether this provider supports adding data to datasets.""" return False
[docs] @staticmethod def supports_create() -> bool: """Whether this provider supports creating a dataset.""" return False
[docs] @staticmethod def supports_export() -> bool: """Whether this provider supports dataset export.""" return False
[docs] @staticmethod def supports_import() -> bool: """Whether this provider supports dataset import.""" return False
[docs] @staticmethod def add(uri: str, destination: Path, **kwargs) -> List["DatasetAddMetadata"]: """Add files from a URI to a dataset.""" raise NotImplementedError
[docs] @staticmethod def get_add_parameters() -> List["ProviderParameter"]: """Returns parameters that can be set for add.""" return []
[docs] @staticmethod def get_export_parameters() -> List["ProviderParameter"]: """Returns parameters that can be set for export.""" return []
[docs] @staticmethod def get_import_parameters() -> List["ProviderParameter"]: """Returns parameters that can be set for import.""" return []
@property def uri(self) -> str: """Return provider's URI.""" return self._uri
[docs] def get_exporter(self, dataset: "Dataset", *, tag: Optional["DatasetTag"], **kwargs) -> "ExporterApi": """Get export manager.""" raise NotImplementedError
[docs] def get_importer(self, **kwargs) -> "ImporterApi": """Get import manager.""" raise NotImplementedError
[docs] def on_create(self, dataset: "Dataset") -> None: """Hook to perform provider-specific actions on a newly-created dataset.""" raise NotImplementedError
[docs]class ImporterApi(abc.ABC): """Interface defining importer methods.""" def __init__(self, uri: str, original_uri: str): self._uri: str = uri self._original_uri: str = original_uri self._provider_dataset_files: Optional[List["ProviderDatasetFile"]] = None self._provider_dataset: Optional["ProviderDataset"] = None @property def provider_dataset(self) -> "ProviderDataset": """Return the remote dataset. This is only valid after a call to ``fetch_provider_dataset``.""" if self._provider_dataset is None: raise errors.DatasetImportError("Dataset is not fetched") return self._provider_dataset @property def provider_dataset_files(self) -> List["ProviderDatasetFile"]: """Return list of dataset files. This is only valid after a call to ``fetch_provider_dataset``.""" if self._provider_dataset_files is None: raise errors.DatasetImportError("Dataset is not fetched") return self._provider_dataset_files @property def uri(self) -> str: """Return url of this record.""" return self._uri @property def original_uri(self) -> str: """Return original URI of this record without any conversion to DOI.""" return self._original_uri @property def latest_uri(self) -> str: """Get URI of the latest version.""" raise NotImplementedError @property def version(self) -> str: """Get record version.""" raise NotImplementedError
[docs] @abc.abstractmethod def fetch_provider_dataset(self) -> "ProviderDataset": """Deserialize this record to a ``ProviderDataset``.""" raise NotImplementedError
[docs] @abc.abstractmethod def is_latest_version(self) -> bool: """Check if record is at last possible version.""" raise NotImplementedError
[docs] def is_version_equal_to(self, dataset: Any) -> bool: """Check if a dataset has the same version as the record.""" return self.version == getattr(dataset, "version", object())
[docs] @abc.abstractmethod def download_files(self, destination: Path, extract: bool) -> List["DatasetAddMetadata"]: """Download dataset files from the remote provider.""" raise NotImplementedError
[docs] @abc.abstractmethod def tag_dataset(self, name: str) -> None: """Create a tag for the dataset ``name`` if the remote dataset has a tag/version.""" raise NotImplementedError
[docs] @abc.abstractmethod def copy_extra_metadata(self, new_dataset: "Dataset") -> None: """Copy provider specific metadata once the dataset is created.""" raise NotImplementedError
[docs]class ExporterApi(abc.ABC): """Interface defining exporter methods.""" def __init__(self, dataset: "Dataset"): super().__init__() self._dataset: "Dataset" = dataset @property def dataset(self) -> "Dataset": """Return the dataset to be exported.""" return self._dataset
[docs] @staticmethod def requires_access_token() -> bool: """Return if export requires an access token.""" return True
[docs] @abc.abstractmethod def set_access_token(self, access_token): """Set access token.""" pass
[docs] @abc.abstractmethod def get_access_token_url(self) -> str: """Endpoint for creation of access token.""" pass
[docs] @abc.abstractmethod def export(self, **kwargs) -> str: """Execute export process.""" raise NotImplementedError
[docs]class ProviderCredentials(abc.ABC, UserDict): """Credentials of a provider. NOTE: An empty string, "", is a valid value. ``NO_VALUE`` means that the value for a key is not set. """ def __init__(self, provider: ProviderApi): super().__init__() self._provider: ProviderApi = provider self.data: Dict[str, Union[str, NoValueType]] = { key: NO_VALUE for key in self.get_canonical_credentials_names() }
[docs] @staticmethod @abc.abstractmethod def get_credentials_names() -> Tuple[str, ...]: """Return a tuple of the required credentials for a provider.""" raise NotImplementedError
@property def provider(self): """Return the associated provider instance.""" return self._provider
[docs] def get_credentials_names_with_no_value(self) -> Tuple[str, ...]: """Return a tuple of credential keys that don't have a valid value.""" return tuple(key for key, value in self.items() if value is NO_VALUE)
[docs] def get_canonical_credentials_names(self) -> Tuple[str, ...]: """Return canonical credentials names that can be used as config keys.""" return tuple(get_canonical_key(key) for key in self.get_credentials_names())
[docs] def get_credentials_section_name(self) -> str: """Get section name for storing credentials. NOTE: This methods should be overridden by subclasses to allow multiple credentials per providers if needed. """ return self.provider.name.lower() # type: ignore
[docs] def read(self) -> Dict[str, Union[str, NoValueType]]: """Read credentials from the config and return them. Set non-existing values to None.""" section = self.get_credentials_section_name() def read_and_convert_credentials(key) -> Union[str, NoValueType]: value = read_credentials(section=section, key=key) return NO_VALUE if value is None else value data = {key: read_and_convert_credentials(key) for key in self.get_canonical_credentials_names()} self.data.update(data) return self.data
[docs] def store(self) -> None: """Store credentials globally.""" section = self.get_credentials_section_name() for key, value in self.items(): if value is not None: store_credentials(section=section, key=key, value=value)