Source code for renku.core.dataset.dataset_add

# -*- coding: utf-8 -*-
#
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataset add business logic."""

import os
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Set, Union, cast
from urllib.parse import urlparse

from renku.core import errors
from renku.core.dataset.context import DatasetContext
from renku.core.dataset.datasets_provenance import DatasetsProvenance
from renku.core.dataset.pointer_file import create_external_file
from renku.core.dataset.providers.api import ImporterApi
from renku.core.dataset.providers.factory import ProviderFactory
from renku.core.dataset.providers.models import DatasetAddAction
from renku.core.storage import check_external_storage, track_paths_in_storage
from renku.core.util import communication, requests
from renku.core.util.dataset import check_url
from renku.core.util.git import get_git_user
from renku.core.util.os import delete_dataset_file, get_files, get_relative_path
from renku.domain_model.dataset import Dataset, DatasetFile
from renku.domain_model.project_context import project_context

if TYPE_CHECKING:
    from renku.core.dataset.providers.models import DatasetAddMetadata


[docs]def add_to_dataset( dataset_name: str, urls: List[str], *, importer: Optional[ImporterApi] = None, force: bool = False, create: bool = False, overwrite: bool = False, sources: Optional[List[Union[str, Path]]] = None, destination: str = "", revision: Optional[str] = None, extract: bool = False, clear_files_before: bool = False, total_size: Optional[int] = None, datadir: Optional[Path] = None, storage: Optional[str] = None, **kwargs, ) -> Dataset: """Import the data into the data directory.""" repository = project_context.repository sources = sources or [] _check_available_space(urls, total_size=total_size) if not create and storage: raise errors.ParameterError( "Using the '--storage' parameter is only required if the '--create' parameter is also used to " "create the dataset at the same time as when data is added to it" ) if create and not storage and any([url.lower().startswith("s3://") for url in urls]): raise errors.ParameterError( "Creating a S3 dataset at the same time as adding data requires the '--storage' parameter to be set" ) try: with DatasetContext(name=dataset_name, create=create, datadir=datadir, storage=storage) as dataset: destination_path = _create_destination_directory(dataset, destination) check_external_storage() # TODO: This is not required for external storages datadir = cast(Path, project_context.path / dataset.get_datadir()) if create and datadir.exists(): # NOTE: Add datadir to paths to add missing files on create for file in get_files(datadir): urls.append(str(file)) files = _download_files( urls=urls, dataset=dataset, importer=importer, destination=destination_path, revision=revision, sources=sources, extract=extract, force=force, **kwargs, ) # Remove all files that are under a .git directory paths_to_avoid = [f.entity_path for f in files if ".git" in str(f.entity_path).split(os.path.sep)] if paths_to_avoid: files = [f for f in files if f.entity_path not in paths_to_avoid] communication.warn( "Ignored adding paths under a .git directory:\n\t" + "\n\t".join(str(p) for p in paths_to_avoid) ) files_to_commit = {f.get_absolute_commit_path(project_context.path) for f in files if not f.gitignored} if not force: files, files_to_commit = _check_ignored_files(files_to_commit, files) # all files at this point can be force-added if not overwrite: files, files_to_commit = _check_existing_files(dataset, files_to_commit, files) move_files_to_dataset(files) # Track non-symlinks in LFS if check_external_storage(): track_paths_in_storage(*files_to_commit) # Force-add to include possible ignored files if len(files_to_commit) > 0: repository.add(*files_to_commit, project_context.pointers_path, force=True) n_staged_changes = len(repository.staged_changes) if n_staged_changes == 0: communication.warn("No new file was added to project") if not files: if create: raise errors.UsageError("There are no files to create a dataset") return dataset dataset_files = _generate_dataset_files(dataset, files, clear_files_before) dataset.add_or_update_files(dataset_files) datasets_provenance = DatasetsProvenance() datasets_provenance.add_or_update(dataset, creator=get_git_user(repository)) project_context.database.commit() except errors.DatasetNotFound: raise errors.DatasetNotFound( message='Dataset "{0}" does not exist.\n' 'Use "renku dataset create {0}" to create the dataset or retry ' '"renku dataset add {0}" command with "--create" option for ' "automatic dataset creation.".format(dataset_name) ) except (FileNotFoundError, errors.GitCommandError) as e: raise errors.ParameterError("Could not find paths/URLs: \n{0}".format("\n".join(urls))) from e else: return dataset
def _download_files( *, urls: List[str], importer: Optional[ImporterApi] = None, dataset: Dataset, destination: Path, extract: bool, revision: Optional[str], sources: List[Union[str, Path]], force: bool = False, **kwargs, ) -> List["DatasetAddMetadata"]: """Process file URLs for adding to a dataset.""" if dataset.storage and any([urlparse(dataset.storage).scheme != urlparse(url).scheme for url in urls]): raise errors.ParameterError( f"The scheme of some urls {urls} does not match the defined storage url {dataset.storage}." ) if importer: return importer.download_files(destination=destination, extract=extract) if len(urls) == 0: raise errors.ParameterError("No URL is specified") if sources and len(urls) > 1: raise errors.ParameterError("Cannot use '--source' with multiple URLs.") files = [] for url in urls: _, is_git = check_url(url) if not is_git and sources: raise errors.ParameterError("Cannot use '-s/--src/--source' with URLs or local files.") provider = ProviderFactory.get_add_provider(uri=url) new_files = provider.add( uri=url, destination=destination, revision=revision, sources=sources, dataset=dataset, extract=extract, force=force, **kwargs, ) files.extend(new_files) return files def _check_available_space(urls: List[str], total_size: Optional[int] = None): """Check that there is enough space available on the device for download.""" if total_size is None: total_size = 0 for url in urls: try: response = requests.head(url, allow_redirects=True) total_size += int(response.headers.get("content-length", 0)) except errors.RequestError: pass usage = shutil.disk_usage(project_context.path) if total_size > usage.free: mb = 2**20 message = "Insufficient disk space (required: {:.2f} MB" "/available: {:.2f} MB). ".format( total_size / mb, usage.free / mb ) raise errors.OperationError(message) def _create_destination_directory(dataset: Dataset, destination: Optional[Union[Path, str]] = None) -> Path: """Create directory for dataset add.""" dataset_datadir = project_context.path / dataset.get_datadir() if dataset_datadir.is_symlink(): dataset_datadir.unlink() # NOTE: Make sure that dataset's data dir exists because we check for existence of a destination later to decide # what will be its name dataset_datadir.mkdir(parents=True, exist_ok=True) destination = destination or "" relative_path = cast(str, get_relative_path(destination, base=dataset_datadir, strict=True)) return dataset_datadir / relative_path def _check_ignored_files(files_to_commit: Set[str], files: List["DatasetAddMetadata"]): """Check if any files added were ignored.""" ignored_files = set(project_context.repository.get_ignored_paths(*files_to_commit)) if ignored_files: ignored_sources = [] for file in files: if not file.gitignored and file.get_absolute_commit_path(project_context.path) in ignored_files: ignored_sources.append(file.source) communication.warn( "Theses paths are ignored by one of your .gitignore files (use '--force' flag if you really want to add " "them):\n\t" + "\n\t".join([str(p) for p in ignored_sources]) ) files_to_commit = files_to_commit.difference(ignored_files) files = [f for f in files if f.get_absolute_commit_path(project_context.path) not in ignored_files] return files, files_to_commit def _check_existing_files(dataset: Dataset, files_to_commit: Set[str], files: List["DatasetAddMetadata"]): """Check if files added already exist.""" existing_files = set() for path in files_to_commit: relative_path = Path(path).relative_to(project_context.path) if dataset.find_file(relative_path): existing_files.add(path) if existing_files: communication.warn( "These existing files were not overwritten (use '--overwrite' flag to overwrite them):\n\t" + "\n\t".join([str(p) for p in existing_files]) ) files_to_commit = files_to_commit.difference(existing_files) files = [f for f in files if f.get_absolute_commit_path(project_context.path) not in existing_files] return files, files_to_commit
[docs]def move_files_to_dataset(files: List["DatasetAddMetadata"]): """Copy/Move files into a dataset's directory.""" for file in files: if not file.has_action: continue # Remove existing file if any; required as a safety-net to avoid corrupting external files delete_dataset_file(file.destination, follow_symlinks=True) file.destination.parent.mkdir(parents=True, exist_ok=True) if file.action == DatasetAddAction.COPY: shutil.copy(file.source, file.destination) elif file.action == DatasetAddAction.MOVE: shutil.move(file.source, file.destination, copy_function=shutil.copy) # type: ignore elif file.action == DatasetAddAction.SYMLINK: create_external_file(target=file.source, path=file.destination) else: raise errors.OperationError(f"Invalid action {file.action}")
def _generate_dataset_files(dataset: Dataset, files: List["DatasetAddMetadata"], clear_files_before: bool = False): """Generate DatasetFile entries from file dict.""" dataset_files = [] for file in files: dataset_file = DatasetFile.from_path(path=file.entity_path, source=file.url, based_on=file.based_on) dataset_files.append(dataset_file) if clear_files_before: dataset.clear_files() return dataset_files