# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Local provider for local filesystem."""
import os
import urllib
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from renku.core import errors
from renku.core.config import get_value
from renku.core.dataset.providers.api import (
AddProviderInterface,
ExporterApi,
ExportProviderInterface,
ProviderApi,
ProviderPriority,
)
from renku.core.storage import check_external_storage, track_paths_in_storage
from renku.core.util import communication
from renku.core.util.metadata import is_protected_path
from renku.core.util.os import get_absolute_path, get_safe_relative_path, is_path_empty, is_subpath
from renku.core.util.urls import check_url
from renku.domain_model.project_context import project_context
from renku.infrastructure.immutable import DynamicProxy
if TYPE_CHECKING:
from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata, ProviderParameter
from renku.domain_model.dataset import Dataset, DatasetTag
[docs]class LocalProvider(ProviderApi, AddProviderInterface, ExportProviderInterface):
"""Local filesystem provider."""
priority = ProviderPriority.LOW
name = "Local"
is_remote = False
def __init__(self, uri: str):
super().__init__(uri=uri)
self._path: Optional[str] = None
[docs] @staticmethod
def supports(uri: str) -> bool:
"""Whether or not this provider supports a given URI."""
is_remote, _ = check_url(uri)
return not is_remote
[docs] @staticmethod
def get_add_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for add."""
from renku.core.dataset.providers.models import ProviderParameter
return [
ProviderParameter(
"copy",
flags=["cp", "copy"],
help="Copy files to the dataset's data directory. Mutually exclusive with --move and --link.",
is_flag=True,
default=False,
),
ProviderParameter(
"move",
flags=["mv", "move"],
help="Move files to the dataset's data directory. Mutually exclusive with --copy and --link.",
is_flag=True,
default=False,
),
ProviderParameter(
"link",
flags=["ln", "link"],
help="Symlink files to the dataset's data directory. Mutually exclusive with --copy and --move.",
is_flag=True,
default=False,
),
]
[docs] @staticmethod
def get_export_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for export."""
from renku.core.dataset.providers.models import ProviderParameter
return [ProviderParameter("path", flags=["p", "path"], help="Path to copy data to.", type=str)]
[docs] def update_files(
self,
files: List[DynamicProxy],
dry_run: bool,
delete: bool,
context: Dict[str, Any],
check_data_directory: bool = False,
**kwargs,
) -> List["DatasetUpdateMetadata"]:
"""Update dataset files from the remote provider."""
from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata
progress_text = "Checking for local updates"
results: List[DatasetUpdateMetadata] = []
try:
communication.start_progress(progress_text, len(files))
check_paths = []
records_to_check = []
for file in files:
communication.update_progress(progress_text, 1)
if file.based_on or file.linked:
continue
if not (project_context.path / file.entity.path).exists():
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))
continue
check_paths.append(file.entity.path)
records_to_check.append(file)
checksums = project_context.repository.get_object_hashes(check_paths)
for file in records_to_check:
current_checksum = checksums.get(file.entity.path)
if not current_checksum:
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))
elif current_checksum != file.entity.checksum:
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE))
elif check_data_directory and not any(file.entity.path == f.entity.path for f in file.dataset.files):
datadir = file.dataset.get_datadir()
try:
get_safe_relative_path(file.entity.path, datadir)
except ValueError:
continue
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE))
finally:
communication.finalize_progress(progress_text)
return results
[docs] def get_exporter(
self, dataset: "Dataset", *, tag: Optional["DatasetTag"], path: Optional[str] = None, **kwargs
) -> "LocalExporter":
"""Create export manager for given dataset."""
self._path = path
return LocalExporter(dataset=dataset, path=self._path, tag=tag)
[docs] def get_importer(self, uri, **kwargs):
"""Get import manager."""
raise NotImplementedError
[docs]class LocalExporter(ExporterApi):
"""Local filesystem export manager."""
def __init__(self, dataset: "Dataset", tag: Optional["DatasetTag"], path: Optional[str]):
super().__init__(dataset)
self._path: Optional[str] = path
self._tag: Optional["DatasetTag"] = tag
[docs] @staticmethod
def requires_access_token() -> bool:
"""Return if export requires an access token."""
return False
[docs] def set_access_token(self, access_token):
"""Set access token."""
raise NotImplementedError
[docs] def get_access_token_url(self):
"""Endpoint for creation of access token."""
return ""
[docs] def export(self, **kwargs) -> str:
"""Execute entire export process."""
from renku.command.schema.dataset import dump_dataset_as_jsonld
from renku.core.util.yaml import write_yaml
if self._path:
dst_root = project_context.path / self._path
else:
dataset_dir = f"{self._dataset.name}-{self._tag.name}" if self._tag else self._dataset.name
dst_root = project_context.path / project_context.datadir / dataset_dir
if dst_root.exists() and not dst_root.is_dir():
raise errors.ParameterError(f"Destination is not a directory: '{dst_root}'")
elif not is_path_empty(dst_root):
raise errors.DirectoryNotEmptyError(dst_root)
dst_root.mkdir(parents=True, exist_ok=True)
data_dir = self._dataset.get_datadir()
with communication.progress("Copying dataset files ...", total=len(self._dataset.files)) as progressbar:
for file in self.dataset.files:
try:
relative_path = str(Path(file.entity.path).relative_to(data_dir))
except ValueError:
relative_path = Path(file.entity.path).name
dst = dst_root / relative_path
dst.parent.mkdir(exist_ok=True, parents=True)
project_context.repository.copy_content_to_file(
file.entity.path, checksum=file.entity.checksum, output_path=dst
)
progressbar.update()
metadata_path = dst_root / "METADATA.yml"
if metadata_path.exists():
metadata_path = dst_root / f"METADATA-{uuid.uuid4()}.yml"
metadata = dump_dataset_as_jsonld(self._dataset)
write_yaml(path=metadata_path, data=metadata)
communication.echo(f"Dataset metadata was copied to {metadata_path}")
return str(dst_root)