# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Web dataset provider."""
import urllib
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
from renku.core import errors
from renku.core.constant import CACHE
from renku.core.dataset.dataset_add import copy_file
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi, ProviderPriority
from renku.core.util import communication
from renku.core.util.os import delete_dataset_file
from renku.core.util.urls import check_url, remove_credentials
from renku.core.util.util import parallel_execute
from renku.domain_model.project_context import project_context
from renku.infrastructure.immutable import DynamicProxy
if TYPE_CHECKING:
from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata
[docs]class WebProvider(ProviderApi, AddProviderInterface):
"""A provider for downloading data from web URLs."""
priority = ProviderPriority.LOWEST
name = "Web"
is_remote = True
[docs] @staticmethod
def supports(uri: str) -> bool:
"""Whether or not this provider supports a given URI."""
is_remote, is_git = check_url(uri)
return is_remote and not is_git
[docs] def update_files(
self,
files: List[DynamicProxy],
dry_run: bool,
delete: bool,
context: Dict[str, Any],
**kwargs,
) -> List["DatasetUpdateMetadata"]:
"""Update dataset files from the remote provider."""
from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateAction, DatasetUpdateMetadata
progress_text = "Checking for local updates"
results: List[DatasetUpdateMetadata] = []
download_cache: Dict[str, DatasetAddMetadata] = {}
potential_updates: List[Tuple[DatasetAddMetadata, DynamicProxy]] = []
try:
communication.start_progress(progress_text, len(files))
for file in files:
if not file.source:
continue
destination = project_context.path / file.dataset.get_datadir()
try:
if file.entity.path not in download_cache:
downloaded_files = download_file(
project_path=project_context.path, uri=file.source, destination=destination
)
if not any(f.entity_path == file.entity.path for f in downloaded_files):
# File probably comes from an extracted download
downloaded_files = download_file(
project_path=project_context.path,
uri=file.source,
destination=destination,
extract=True,
)
download_cache.update({str(f.entity_path): f for f in downloaded_files})
except errors.OperationError:
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))
else:
metadata = download_cache.get(file.entity.path)
if not metadata:
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))
if not dry_run and delete:
delete_dataset_file(file.entity.path, follow_symlinks=True)
project_context.repository.add(file.entity.path, force=True)
else:
potential_updates.append((metadata, file))
finally:
communication.finalize_progress(progress_text)
if not potential_updates:
return results
check_paths: List[Union[Path, str]] = [
str(u[0].source.relative_to(project_context.path)) for u in potential_updates
]
# Stage files temporarily so we can get hashes
project_context.repository.add(*check_paths, force=True)
hashes = project_context.repository.get_object_hashes(check_paths)
project_context.repository.remove(*check_paths, index=True)
for metadata, file in potential_updates:
if file.entity.checksum != hashes.get(metadata.source):
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE))
if not dry_run:
copy_file(metadata, file.dataset, storage=None)
return results
def _ensure_dropbox(url):
"""Ensure dropbox url is set for file download."""
if not isinstance(url, urllib.parse.ParseResult):
url = urllib.parse.urlparse(url)
query = url.query or ""
if "dl=0" in url.query:
query = query.replace("dl=0", "dl=1")
else:
query += "dl=1"
url = url._replace(query=query)
return url
def _provider_check(url):
"""Check additional provider related operations."""
from renku.core.util import requests
url = requests.get_redirect_url(url)
url = urllib.parse.urlparse(url)
if "dropbox.com" in url.netloc:
url = _ensure_dropbox(url)
return urllib.parse.urlunparse(url)
[docs]def download_file(
uri: str,
filename: Optional[str] = None,
*,
project_path: Path,
destination: Path,
extract: bool = False,
multiple: bool = False,
) -> List["DatasetAddMetadata"]:
"""Download a file from a URI and return its metadata."""
from renku.core.dataset.providers.models import DatasetAddAction, DatasetAddMetadata
from renku.core.util import requests
uri = requests.get_redirect_url(uri) # TODO: Check that this is not duplicate
uri = _provider_check(uri)
with project_context.with_path(project_path):
try:
# NOTE: If execution time was less than the delay, block the request until delay seconds are passed
tmp_root, paths = requests.download_file(
base_directory=project_context.metadata_path / CACHE, url=uri, filename=filename, extract=extract
)
except errors.RequestError as e: # pragma nocover
raise errors.OperationError(f"Cannot download from {uri}") from e
paths = [p for p in paths if not p.is_dir()]
if len(paths) > 1 or multiple:
if destination.exists() and not destination.is_dir():
raise errors.ParameterError(f"Destination is not a directory: '{destination}'")
destination.mkdir(parents=True, exist_ok=True)
elif len(paths) == 1:
tmp_root = paths[0].parent if destination.exists() else paths[0]
paths = [(src, destination / src.relative_to(tmp_root)) for src in paths if not src.is_dir()]
return [
DatasetAddMetadata(
entity_path=dst.relative_to(project_context.path),
url=remove_credentials(uri),
action=DatasetAddAction.MOVE,
source=src,
destination=dst,
)
for src, dst in paths
]
[docs]def download_files(
urls: Tuple[str, ...], destination: Path, names: Tuple[str, ...], extract: bool
) -> List["DatasetAddMetadata"]:
"""Download multiple files and return their metadata."""
assert len(urls) == len(names), f"Number of URL and names don't match {len(urls)} != {len(names)}"
if destination.exists() and not destination.is_dir():
raise errors.ParameterError(f"Destination is not a directory: '{destination}'")
destination.mkdir(parents=True, exist_ok=True)
return parallel_execute(
download_file,
urls,
names,
project_path=project_context.path,
destination=destination,
extract=extract,
multiple=True,
)