# -*- coding: utf-8 -*-
#
# Copyright 2020 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper utilities for handling URLs."""
import os
import re
import unicodedata
import urllib
from typing import List, Optional
from urllib.parse import ParseResult, urlparse
from renku.command.command_builder.command import inject
from renku.core import errors
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.util.git import get_remote, parse_git_url
from renku.core.util.os import is_subpath
[docs]def url_to_string(url):
"""Convert url from ``list`` or ``ParseResult`` to string."""
if isinstance(url, list):
return ParseResult(scheme=url[0], netloc=url[1], path=url[2], params=None, query=None, fragment=None).geturl()
if isinstance(url, ParseResult):
return url.geturl()
if isinstance(url, str):
return url
raise ValueError("url value not recognized")
[docs]def remove_credentials(url):
"""Remove username and password from a URL."""
if url is None:
return ""
parsed = urllib.parse.urlparse(url)
return parsed._replace(netloc=parsed.hostname).geturl()
[docs]def get_host(client):
"""Return the hostname for the resource URIs.
Default is localhost. If RENKU_DOMAIN is set, it overrides the host from remote.
"""
host = "localhost"
if client:
host = client.remote.get("host") or host
return os.environ.get("RENKU_DOMAIN") or host
[docs]def get_path(url: str) -> str:
"""Return path part of a url."""
return urllib.parse.urlparse(url).path
[docs]def get_scheme(uri: str) -> str:
"""Return scheme of a URI."""
return urllib.parse.urlparse(uri).scheme.lower()
[docs]@inject.autoparams("client_dispatcher")
def parse_authentication_endpoint(
client_dispatcher: IClientDispatcher, endpoint: Optional[str] = None, use_remote: bool = False
):
"""Return a parsed url.
If an endpoint is provided then use it, otherwise, look for a configured endpoint. If no configured endpoint exists
then try to use project's remote url.
"""
client = client_dispatcher.current_client
if not endpoint:
endpoint = client.get_value(section="renku", key="endpoint")
if not endpoint:
if not use_remote:
return
remote = get_remote(client.repository)
if not remote or not remote.url:
return
endpoint = f"https://{parse_git_url(remote.url).hostname}/"
if not endpoint.startswith("http"):
endpoint = f"https://{endpoint}"
parsed_endpoint = urllib.parse.urlparse(endpoint)
if not parsed_endpoint.netloc:
raise errors.ParameterError(f"Invalid endpoint: `{endpoint}`.")
return parsed_endpoint._replace(scheme="https", path="/", params="", query="", fragment="")
[docs]def get_slug(name: str, invalid_chars: Optional[List[str]] = None, lowercase: bool = True) -> str:
"""Create a slug from name."""
invalid_chars = invalid_chars or []
lower_case = name.lower() if lowercase else name
no_space = re.sub(r"\s+", "_", lower_case)
normalized = unicodedata.normalize("NFKD", no_space).encode("ascii", "ignore").decode("utf-8")
valid_chars_pattern = [r"\w", ".", "_", "-"]
if len(invalid_chars) > 0:
valid_chars_pattern = [ch for ch in valid_chars_pattern if ch not in invalid_chars]
no_invalid_characters = re.sub(f'[^{"".join(valid_chars_pattern)}]', "_", normalized)
no_duplicates = re.sub(r"([._-])[._-]+", r"\1", no_invalid_characters)
valid_start = re.sub(r"^[._-]", "", no_duplicates)
valid_end = re.sub(r"[._-]$", "", valid_start)
no_dot_lock_at_end = re.sub(r"\.lock$", "_lock", valid_end)
return no_dot_lock_at_end
[docs]def is_uri_subfolder(uri: str, subfolder_uri: str) -> bool:
"""Check if one uri is a 'subfolder' of another."""
parsed_uri = urlparse(uri)
parsed_subfolder_uri = urlparse(subfolder_uri)
parsed_uri_path = parsed_uri.path
parsed_subfolder_uri_path = parsed_subfolder_uri.path
if parsed_uri_path in ["", "."]:
# NOTE: s3://test has a path that equals "" and Path("") gets interpreted as Path(".")
# this becomes a problem then when s3://test/1 has an "absolute-like" path of Path("/1")
# and Path(".") is not considered a subpath of Path("/1") but from the uris we see that this
# is indeed a subpath
parsed_uri_path = "/"
if parsed_subfolder_uri_path in ["", "."]:
parsed_subfolder_uri_path = "/"
if parsed_uri.scheme != parsed_subfolder_uri.scheme:
# INFO: catch s3://test vs http://test
return False
if parsed_uri.netloc != parsed_subfolder_uri.netloc:
# INFO: catch s3://test1 vs s3://test2
return False
return is_subpath(parsed_subfolder_uri_path, parsed_uri_path)