# -*- coding: utf-8 -*-
#
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku activity database gateway implementation."""
import itertools
from pathlib import Path
from typing import List, Optional, Set, Tuple, Union
from persistent.list import PersistentList
from renku.command.command_builder.command import inject
from renku.core import errors
from renku.core.interface.activity_gateway import IActivityGateway
from renku.core.interface.plan_gateway import IPlanGateway
from renku.core.util.os import are_paths_related
from renku.core.workflow.activity import create_activity_graph
from renku.domain_model.project_context import project_context
from renku.domain_model.provenance.activity import Activity, ActivityCollection
from renku.domain_model.workflow.plan import Plan
from renku.infrastructure.database import Database
from renku.infrastructure.gateway.database_gateway import ActivityDownstreamRelation
[docs]class ActivityGateway(IActivityGateway):
"""Gateway for activity database operations."""
[docs] def get_by_id(self, id: str) -> Optional[Activity]:
"""Get an activity by id."""
return project_context.database["activities"].get(id)
[docs] def get_all_usage_paths(self) -> List[str]:
"""Return all usage paths."""
database = project_context.database
return list(a for a in database["activities-by-usage"].keys())
[docs] def get_all_generation_paths(self) -> List[str]:
"""Return all generation paths."""
database = project_context.database
return list(database["activities-by-generation"].keys())
[docs] def get_activities_by_usage(self, path: Union[Path, str], checksum: Optional[str] = None) -> List[Activity]:
"""Return the list of all activities that use a path."""
by_usage = project_context.database["activities-by-usage"]
activities = by_usage.get(str(path), [])
if not checksum:
return activities
result = []
for activity in activities:
usage = next((g for g in activity.usages if g.entity.checksum == checksum), None)
if usage:
result.append(activity)
return result
[docs] def get_activities_by_generation(self, path: Union[Path, str], checksum: Optional[str] = None) -> List[Activity]:
"""Return the list of all activities that generate a path."""
by_generation = project_context.database["activities-by-generation"]
activities = by_generation.get(str(path), [])
activities = (a for a in activities if not a.deleted)
if not checksum:
return list(activities)
result = []
for activity in activities:
generation = next((g for g in activity.generations if g.entity.checksum == checksum), None)
if generation:
result.append(activity)
return result
[docs] def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]:
"""Get downstream activities that depend on this activity."""
# NOTE: since indices are populated one way when adding an activity, we need to query two indices
database = project_context.database
activity_catalog = database["activity-catalog"]
tok = activity_catalog.tokenizeQuery
downstream = set(activity_catalog.findValues("downstream", tok(upstream=activity), maxDepth=max_depth))
return downstream
[docs] def get_upstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]:
"""Get upstream activities that this activity depends on them."""
database = project_context.database
activity_catalog = database["activity-catalog"]
tok = activity_catalog.tokenizeQuery
upstream = set(activity_catalog.findValues("upstream", tok(downstream=activity), maxDepth=max_depth))
return upstream
[docs] def get_downstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]:
"""Get a list of tuples of all downstream paths of this activity."""
database = project_context.database
activity_catalog = database["activity-catalog"]
tok = activity_catalog.tokenizeQuery
downstream_chains = activity_catalog.findRelationChains(tok(upstream=activity))
downstream_chains = [tuple(r.downstream for r in c) for c in downstream_chains]
return downstream_chains
[docs] def get_upstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]:
"""Get a list of tuples of all upstream paths of this activity."""
database = project_context.database
activity_catalog = database["activity-catalog"]
tok = activity_catalog.tokenizeQuery
upstream_chains = activity_catalog.findRelationChains(tok(downstream=activity))
upstream_chains = [tuple(r.upstream for r in c) for c in upstream_chains]
return upstream_chains
[docs] def get_all_activities(self, include_deleted: bool = False) -> List[Activity]:
"""Get all activities in the project."""
database = project_context.database
return [a for a in database["activities"].values() if not a.deleted or include_deleted]
[docs] def add(self, activity: Activity):
"""Add an ``Activity`` to storage."""
database = project_context.database
database["activities"].add(activity)
_index_activity(activity=activity, database=database)
assert isinstance(activity.association.plan, Plan)
plan_gateway = inject.instance(IPlanGateway)
plan_gateway.add(activity.association.plan)
# NOTE: Check for a cycle if this activity
upstream_chains = self.get_upstream_activity_chains(activity)
downstream_chains = self.get_downstream_activity_chains(activity)
all_activities = set()
for activity_chain in itertools.chain(upstream_chains, downstream_chains):
for current_activity in activity_chain:
all_activities.add(current_activity)
# NOTE: This call raises an exception if there is a cycle
create_activity_graph(list(all_activities), with_inputs_outputs=True)
[docs] def add_activity_collection(self, activity_collection: ActivityCollection):
"""Add an ``ActivityCollection`` to storage."""
database = project_context.database
database["activity-collections"].add(activity_collection)
[docs] def get_all_activity_collections(self) -> List[ActivityCollection]:
"""Get all activity collections in the project."""
return list(project_context.database["activity-collections"].values())
[docs] def remove(self, activity: Activity, keep_reference: bool = True, force: bool = False):
"""Remove an activity from the storage.
Args:
activity(Activity): The activity to be removed.
keep_reference(bool): Whether to keep the activity in the ``activities`` index or not.
force(bool): Force-delete the activity even if it has downstream activities.
"""
database = project_context.database
if not force and self.get_downstream_activities(activity):
raise errors.ActivityDownstreamNotEmptyError(activity)
if not keep_reference:
database["activities"].remove(activity)
_unindex_activity(activity=activity, database=database)
[docs]def reindex_catalog(database):
"""Clear and re-create database's activity-catalog and its relations."""
activity_catalog = database["activity-catalog"]
relations = database["_downstream_relations"]
activity_catalog.clear()
relations.clear()
for activity in database["activities"].values():
_index_activity(activity=activity, database=database)
def _index_activity(activity: Activity, database: Database):
"""Add an activity to database indexes and create its up/downstream relations."""
if activity.deleted:
return
upstreams = set()
downstreams = set()
by_usage = database["activities-by-usage"]
by_generation = database["activities-by-generation"]
for usage in activity.usages:
if usage.entity.path not in by_usage:
by_usage[usage.entity.path] = PersistentList()
if activity not in by_usage[usage.entity.path]:
by_usage[usage.entity.path].append(activity)
for path, activities in by_generation.items():
if are_paths_related(path, usage.entity.path):
upstreams.update(activities)
for generation in activity.generations:
if generation.entity.path not in by_generation:
by_generation[generation.entity.path] = PersistentList()
if activity not in by_generation[generation.entity.path]:
by_generation[generation.entity.path].append(activity)
for path, activities in by_usage.items():
if are_paths_related(path, generation.entity.path):
downstreams.update(activities)
activity_catalog = database["activity-catalog"]
if upstreams:
for a in upstreams:
if a != activity:
activity_catalog.index(ActivityDownstreamRelation(downstream=activity, upstream=a))
if downstreams:
for a in downstreams:
if a != activity:
activity_catalog.index(ActivityDownstreamRelation(downstream=a, upstream=activity))
if upstreams or downstreams:
activity_catalog._p_changed = True
def _unindex_activity(activity: Activity, database: Database):
"""Add an activity to database indexes and create its up/downstream relations."""
upstreams = set()
downstreams = set()
by_usage = database["activities-by-usage"]
by_generation = database["activities-by-generation"]
for usage in activity.usages:
if usage.entity.path in by_usage:
activities = by_usage[usage.entity.path]
activities.remove(activity)
if len(activities) == 0:
del by_usage[usage.entity.path]
for path, activities in by_generation.items():
if are_paths_related(path, usage.entity.path):
upstreams.update(activities)
for generation in activity.generations:
if generation.entity.path in by_generation:
activities = by_generation[generation.entity.path]
activities.remove(activity)
if len(activities) == 0:
del by_generation[generation.entity.path]
for path, activities in by_usage.items():
if are_paths_related(path, generation.entity.path):
downstreams.update(activities)
activity_catalog = database["activity-catalog"]
relations = database["_downstream_relations"]
if upstreams:
for s in upstreams:
relation = ActivityDownstreamRelation(downstream=activity, upstream=s)
activity_catalog.unindex(relation)
relations.pop(relation.id, None)
if downstreams:
for s in downstreams:
relation = ActivityDownstreamRelation(downstream=s, upstream=activity)
activity_catalog.unindex(relation)
relations.pop(relation.id, None)
if upstreams or downstreams:
activity_catalog._p_changed = True