Source code for renku.core.dataset.providers.git
# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Git dataset provider."""
import glob
import os
import shutil
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union
from renku.core import errors
from renku.core.dataset.pointer_file import create_external_file
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi, ProviderPriority
from renku.core.storage import pull_paths_from_storage
from renku.core.util import communication
from renku.core.util.git import clone_repository, get_cache_directory_for_repository
from renku.core.util.metadata import is_linked_file
from renku.core.util.os import delete_dataset_file, get_files, is_subpath
from renku.core.util.urls import check_url, remove_credentials
from renku.domain_model.dataset import RemoteEntity
from renku.domain_model.project_context import project_context
from renku.infrastructure.immutable import DynamicProxy
if TYPE_CHECKING:
from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata, ProviderParameter
[docs]class GitProvider(ProviderApi, AddProviderInterface):
"""Git provider."""
priority = ProviderPriority.NORMAL
name = "Git"
is_remote = True
[docs] @staticmethod
def supports(uri: str) -> bool:
"""Whether or not this provider supports a given URI."""
is_remote, is_git = check_url(uri)
return is_remote and is_git
[docs] @staticmethod
def get_add_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for add."""
from renku.core.dataset.providers.models import ProviderParameter
return [
ProviderParameter(
"sources",
flags=["s", "source"],
default=None,
help="Path(s) within remote git repo to be added",
multiple=True,
),
ProviderParameter(
"revision",
flags=["r", "ref"],
default=None,
help="Add files from a specific commit/tag/branch.",
type=str,
),
]
[docs] def get_metadata(
self,
uri: str,
destination: Path,
*,
sources: Optional[List[Union[Path, str]]] = None,
revision: Optional[str] = None,
**kwargs,
) -> List["DatasetAddMetadata"]:
"""Get metadata of files that will be added to a dataset."""
from renku.core.dataset.providers.models import DatasetAddAction, DatasetAddMetadata
destination_exists = destination.exists()
destination_is_dir = destination.is_dir()
remote_repository = clone_repository(
url=uri,
path=get_cache_directory_for_repository(url=uri),
checkout_revision=revision,
depth=None,
clean=True,
)
def check_sources_are_within_remote_repo():
if not sources:
return
for source in sources:
if not is_subpath(path=remote_repository.path / source, base=remote_repository.path):
raise errors.ParameterError(f"Path '{source}' is not within the repository")
def get_source_paths() -> Set[Path]:
"""Return all paths from the repo that match a source pattern."""
if not sources:
return set(remote_repository.path.glob("*")) # type: ignore
paths = set()
for source in sources:
# NOTE: Normalized source to resolve .. references (if any). This preserves wildcards.
normalized_source = os.path.normpath(source)
absolute_source = os.path.join(remote_repository.path, normalized_source) # type: ignore
# NOTE: Path.glob("root/**") does not return correct results (e.g. it include ``root`` in the result)
subpaths = {Path(p) for p in glob.glob(absolute_source)}
if len(subpaths) == 0:
raise errors.ParameterError("No such file or directory", param_hint=str(source))
paths |= subpaths
return paths
def should_copy(source_paths: List[Path]) -> bool:
n_paths = len(source_paths)
has_multiple_sources = n_paths > 1
source_is_dir = has_multiple_sources or (n_paths == 1 and source_paths[0].is_dir())
if source_is_dir and destination_exists and not destination_is_dir:
raise errors.ParameterError(f"Destination is not a directory: '{destination}'")
return has_multiple_sources or (destination_exists and destination_is_dir)
def get_file_metadata(src: Path, dst: Path) -> Optional["DatasetAddMetadata"]:
path_in_src_repo = src.relative_to(remote_repository.path) # type: ignore
path_in_dst_repo = dst.relative_to(project_context.path)
already_copied = path_in_dst_repo in new_files # A path with the same destination is already copied
new_files[path_in_dst_repo].append(path_in_src_repo)
if already_copied:
return None
checksum = remote_repository.get_object_hash(revision="HEAD", path=path_in_src_repo) # type: ignore
if not checksum:
raise errors.FileNotFound(f"Cannot find '{file}' in the remote project")
return DatasetAddMetadata(
entity_path=path_in_dst_repo,
url=remove_credentials(uri),
based_on=RemoteEntity(checksum=checksum, path=path_in_src_repo, url=uri),
action=DatasetAddAction.MOVE,
source=src,
destination=dst,
)
check_sources_are_within_remote_repo()
results = []
new_files: Dict[Path, List[Path]] = defaultdict(list)
paths = get_source_paths()
with project_context.with_path(remote_repository.path):
pull_paths_from_storage(project_context.repository, *paths)
is_copy = should_copy(list(paths))
for path in paths:
dst_root = destination / path.name if is_copy else destination
for file in get_files(path):
src = file
relative_path = file.relative_to(path)
dst = dst_root / relative_path
metadata = get_file_metadata(src, dst)
if metadata:
results.append(metadata)
duplicates = [v for v in new_files.values() if len(v) > 1]
if duplicates:
files = {str(p) for paths in duplicates for p in paths}
files_str = "/n/t".join(sorted(files))
communication.warn(f"The following files overwrite each other in the destination project:/n/t{files_str}")
return results
[docs] def update_files(
self,
files: List[DynamicProxy],
dry_run: bool,
delete: bool,
context: Dict[str, Any],
ref: Optional[str] = None,
**kwargs,
) -> List["DatasetUpdateMetadata"]:
"""Update dataset files from the remote provider."""
from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata
if "visited_repos" not in context:
context["visited_repos"] = {}
progress_text = "Checking git files for updates"
results: List[DatasetUpdateMetadata] = []
try:
communication.start_progress(progress_text, len(files))
for file in files:
communication.update_progress(progress_text, 1)
if not file.based_on:
continue
based_on = file.based_on
url = based_on.url
if url in context["visited_repos"]:
remote_repository = context["visited_repos"][url]
else:
communication.echo(msg="Cloning remote repository...")
path = get_cache_directory_for_repository(url=url)
remote_repository = clone_repository(url=url, path=path, checkout_revision=ref)
context["visited_repos"][url] = remote_repository
checksum = remote_repository.get_object_hash(path=based_on.path, revision="HEAD")
found = checksum is not None
changed = found and based_on.checksum != checksum
src = remote_repository.path / based_on.path
dst = project_context.metadata_path.parent / file.entity.path
if not found:
if not dry_run and delete:
delete_dataset_file(dst, follow_symlinks=True)
project_context.repository.add(dst, force=True)
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))
elif changed:
if not dry_run:
# Fetch file if it is tracked by Git LFS
pull_paths_from_storage(remote_repository, remote_repository.path / based_on.path)
if is_linked_file(path=src, project_path=remote_repository.path):
delete_dataset_file(dst, follow_symlinks=True)
create_external_file(target=src.resolve(), path=dst)
else:
shutil.copy(src, dst)
file.based_on = RemoteEntity(
checksum=checksum, path=based_on.path, url=based_on.url # type: ignore
)
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE))
finally:
communication.finalize_progress(progress_text)
return results