# -*- coding: utf-8 -*-
#
# Copyright 2018-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrap Git client."""
import glob
import itertools
import os
import sys
import tempfile
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import List
import attr
from renku.core import errors
from renku.core.util.os import get_absolute_path
from renku.core.util.urls import remove_credentials
COMMIT_DIFF_STRATEGY = "DIFF"
STARTED_AT = int(time.time() * 1e3)
[docs]def prepare_commit(client, commit_only=None, skip_dirty_checks=False, skip_staging: bool = False):
"""Gather information about repo needed for committing later on."""
diff_before = set()
if skip_staging:
if not isinstance(commit_only, list) or len(commit_only) == 0:
raise errors.OperationError("Cannot use ``skip_staging`` without specifying files to commit.")
if commit_only == COMMIT_DIFF_STRATEGY:
if len(client.repository.staged_changes) > 0 or len(client.repository.unstaged_changes) > 0:
client.repository.reset()
# Exclude files created by pipes.
diff_before = {
file
for file in client.repository.untracked_files
if STARTED_AT - int(Path(file).stat().st_ctime * 1e3) >= 1e3
}
if isinstance(commit_only, list) and not skip_dirty_checks:
for path_ in commit_only:
client.ensure_untracked(str(path_))
client.ensure_unstaged(str(path_))
return diff_before
[docs]def finalize_commit(
client,
diff_before,
commit_only=None,
commit_empty=True,
raise_if_empty=False,
commit_message=None,
abbreviate_message=True,
skip_staging: bool = False,
):
"""Commit modified/added paths."""
from renku.core.util.git import shorten_message
from renku.infrastructure.repository import Actor
from renku.version import __version__, version_url
committer = Actor(name=f"renku {__version__}", email=version_url)
change_types = {item.a_path: item.change_type for item in client.repository.unstaged_changes}
if commit_only == COMMIT_DIFF_STRATEGY:
# Get diff generated in command.
staged_after = set(change_types.keys())
modified_after_change_types = {item.a_path: item.change_type for item in client.repository.staged_changes}
modified_after = set(modified_after_change_types.keys())
change_types.update(modified_after_change_types)
diff_after = set(client.repository.untracked_files).union(staged_after).union(modified_after)
# Remove files not touched in command.
commit_only = list(diff_after - diff_before)
if isinstance(commit_only, list):
for path_ in commit_only:
p = client.path / path_
if p.exists() or change_types.get(str(path_)) == "D":
client.repository.add(path_)
if not commit_only:
client.repository.add(all=True)
try:
diffs = [d.a_path for d in client.repository.staged_changes]
except errors.GitError:
diffs = []
if not commit_empty and not diffs:
if raise_if_empty:
raise errors.NothingToCommit()
return
if commit_message and not isinstance(commit_message, str):
raise errors.CommitMessageEmpty()
elif not commit_message:
argv = [os.path.basename(sys.argv[0])] + [remove_credentials(arg) for arg in sys.argv[1:]]
commit_message = " ".join(argv)
if abbreviate_message:
commit_message = shorten_message(commit_message)
# NOTE: Only commit specified paths when skipping staging area
paths = commit_only if skip_staging else []
# Ignore pre-commit hooks since we have already done everything.
client.repository.commit(commit_message + client.transaction_id, committer=committer, no_verify=True, paths=paths)
[docs]def prepare_worktree(
original_client,
path=None,
branch_name=None,
commit=None,
):
"""Set up a Git worktree to provide isolation."""
from renku.core.util.contexts import Isolation
from renku.infrastructure.repository import NULL_TREE
path = path or tempfile.mkdtemp()
branch_name = branch_name or "renku/run/isolation/" + uuid.uuid4().hex
# TODO sys.argv
if commit is NULL_TREE:
original_client.repository.create_worktree(path, detach=True)
client = attr.evolve(original_client, path=path)
client.repository.run_git_command("checkout", "--orphan", branch_name)
client.repository.remove("*", recursive=True, force=True)
else:
revision = None
if commit:
revision = commit.hexsha
original_client.repository.create_worktree(path, branch=branch_name, reference=revision)
client = attr.evolve(original_client, path=path)
client.repository.get_configuration = original_client.repository.get_configuration
# Keep current directory relative to repository root.
relative = Path(os.path.relpath(Path(".").resolve(), original_client.path))
# Reroute standard streams
original_mapped_std = get_mapped_std_streams(original_client.candidate_paths)
mapped_std = {}
for name, stream in original_mapped_std.items():
stream_path = Path(path) / (Path(stream).relative_to(original_client.path))
stream_path = stream_path.absolute()
if not stream_path.exists():
stream_path.parent.mkdir(parents=True, exist_ok=True)
stream_path.touch()
mapped_std[name] = stream_path
_clean_streams(original_client.repository, original_mapped_std)
new_cwd = Path(path) / relative
new_cwd.mkdir(parents=True, exist_ok=True)
isolation = Isolation(cwd=str(new_cwd), **mapped_std)
isolation.__enter__()
return client, isolation, path, branch_name
[docs]def finalize_worktree(
client, isolation, path, branch_name, delete, new_branch, merge_args=("--ff-only",), exception=None
):
"""Cleanup and merge a previously created Git worktree."""
exc_info = (None, None, None)
if exception:
exc_info = (type(exception), exception, exception.__traceback__)
isolation.__exit__(*exc_info)
try:
client.repository.run_git_command("merge", branch_name, *merge_args)
except errors.GitCommandError:
raise errors.FailedMerge(client.repository, branch_name, merge_args)
if delete:
client.repository.remove_worktree(path)
if new_branch:
# delete the created temporary branch
client.repository.branches.remove(branch_name)
if client.external_storage_requested:
client.checkout_paths_from_storage()
[docs]def get_mapped_std_streams(lookup_paths, streams=("stdin", "stdout", "stderr")):
"""Get a mapping of standard streams to given paths."""
# FIXME add device number too
standard_inos = {}
for stream in streams:
try:
stream_stat = os.fstat(getattr(sys, stream).fileno())
key = stream_stat.st_dev, stream_stat.st_ino
standard_inos[key] = stream
except Exception: # FIXME UnsupportedOperation
pass
# FIXME if not getattr(sys, stream).istty()
def stream_inos(paths):
"""Yield tuples with stats and path."""
for path in paths:
try:
stat = os.stat(path)
key = (stat.st_dev, stat.st_ino)
if key in standard_inos:
yield standard_inos[key], path
except FileNotFoundError: # pragma: no cover
pass
return []
return dict(stream_inos(lookup_paths)) if standard_inos else {}
def _clean_streams(repository, mapped_streams):
"""Clean mapped standard streams."""
for stream_name in ("stdout", "stderr"):
stream = mapped_streams.get(stream_name)
if not stream:
continue
absolute_path = get_absolute_path(stream, repository.path)
path = os.path.relpath(absolute_path, start=repository.path)
if path not in repository.files:
os.remove(absolute_path)
else:
checksum = repository.get_object_hash(path=absolute_path, revision="HEAD")
repository.copy_content_to_file(path=absolute_path, checksum=checksum, output_path=path)
def _expand_directories(paths):
"""Expand directory with all files it contains."""
processed_paths = set()
for path in paths:
for matched_path in glob.iglob(str(path), recursive=True):
if matched_path in processed_paths:
continue
path_ = Path(matched_path)
if path_.is_dir():
for expanded in path_.rglob("*"):
processed_paths.add(str(expanded))
yield str(expanded)
else:
processed_paths.add(matched_path)
yield matched_path
[docs]@attr.s
class GitCore:
"""Wrap Git client."""
repository = attr.ib(init=False, default=None)
def __attrs_post_init__(self):
"""Initialize computed attributes."""
from renku.infrastructure.repository import Repository
#: Create an instance of a Git repository for the given path.
try:
self.repository = Repository(self.path)
except errors.GitError:
self.repository = None
def __del__(self):
if self.repository:
self.repository.close()
@property
def modified_paths(self):
"""Return paths of modified files."""
return [item.b_path for item in self.repository.unstaged_changes if item.b_path]
@property
def dirty_paths(self):
"""Get paths of dirty files in the repository."""
repo_path = self.repository.path
staged_files = [d.a_path for d in self.repository.staged_changes] if self.repository.head.is_valid() else []
return {
os.path.join(repo_path, p) for p in self.repository.untracked_files + self.modified_paths + staged_files
}
@property
def candidate_paths(self):
"""Return all paths in the index and untracked files."""
return [
os.path.join(self.repository.path, path)
for path in itertools.chain(self.repository.files, self.repository.untracked_files)
]
[docs] def find_ignored_paths(self, *paths) -> List[str]:
"""Return ignored paths matching ``.gitignore`` file."""
return self.repository.get_ignored_paths(*paths)
[docs] def remove_unmodified(self, paths, autocommit=True):
"""Remove unmodified paths and return their names."""
tested_paths = set(_expand_directories(paths))
# Keep only unchanged files in the output paths.
tracked_paths = {
diff.b_path
for diff in self.repo.index.diff(None)
if diff.change_type in {"A", "R", "M", "T"} and diff.b_path in tested_paths
}
unchanged_paths = tested_paths - tracked_paths
return unchanged_paths
[docs] def ensure_clean(self, ignore_std_streams=False):
"""Make sure the repository is clean."""
dirty_paths = self.dirty_paths
mapped_streams = get_mapped_std_streams(dirty_paths)
if ignore_std_streams:
if dirty_paths - set(mapped_streams.values()):
_clean_streams(self.repository, mapped_streams)
raise errors.DirtyRepository(self.repository)
elif self.repository.is_dirty():
_clean_streams(self.repository, mapped_streams)
raise errors.DirtyRepository(self.repository)
[docs] def ensure_untracked(self, path):
"""Ensure that path is not part of git untracked files."""
untracked = self.repository.untracked_files
for file_path in untracked:
is_parent = (self.path / file_path).parent == (self.path / path)
is_equal = path == file_path
if is_parent or is_equal:
raise errors.DirtyRenkuDirectory(self.repository)
[docs] def ensure_unstaged(self, path):
"""Ensure that path is not part of git staged files."""
staged = self.repository.staged_changes
for file_path in staged:
is_parent = str(file_path.a_path).startswith(path)
is_equal = path == file_path.a_path
if is_parent or is_equal:
raise errors.DirtyRenkuDirectory(self.repository)
[docs] def setup_credential_helper(self):
"""Setup git credential helper to ``cache`` if not set already."""
credential_helper = self.repository.get_configuration().get_value("credential", "helper", "")
if not credential_helper:
with self.repository.get_configuration(writable=True) as w:
w.set_value("credential", "helper", "cache")
[docs] @contextmanager
def commit(
self,
commit_only=None,
commit_empty=True,
raise_if_empty=False,
commit_message=None,
abbreviate_message=True,
skip_dirty_checks=False,
):
"""Automatic commit."""
diff_before = prepare_commit(self, commit_only=commit_only, skip_dirty_checks=skip_dirty_checks)
yield
finalize_commit(
self,
diff_before,
commit_only=commit_only,
commit_empty=commit_empty,
raise_if_empty=raise_if_empty,
commit_message=commit_message,
abbreviate_message=abbreviate_message,
)
[docs] @contextmanager
def worktree(self, path=None, branch_name=None, commit=None, merge_args=("--ff-only",)):
"""Create new worktree."""
from renku.infrastructure.repository import NULL_TREE
delete = branch_name is None
new_branch = commit is not NULL_TREE
new_client, isolation, path, branch_name = prepare_worktree(self, path, branch_name, commit)
try:
yield
except (Exception, BaseException) as e:
finalize_worktree(new_client, isolation, path, branch_name, delete, new_branch, merge_args, exception=e)
raise
else:
finalize_worktree(new_client, isolation, path, branch_name, delete, new_branch, merge_args)