Source code for renku.domain_model.provenance.activity

# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent an execution of a Plan."""

from datetime import datetime
from itertools import chain
from typing import Dict, List, Optional, Union, cast
from uuid import uuid4

import deal
from werkzeug.utils import cached_property

from renku.command.command_builder import inject
from renku.core.interface.project_gateway import IProjectGateway
from renku.core.util.datetime8601 import local_now
from renku.core.util.git import get_entity_from_revision, get_git_user
from renku.domain_model.entity import Collection, Entity
from renku.domain_model.provenance.agent import Person, SoftwareAgent
from renku.domain_model.provenance.annotation import Annotation
from renku.domain_model.provenance.parameter import ParameterValue
from renku.domain_model.workflow.plan import Plan
from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan
from renku.infrastructure.database import Persistent
from renku.infrastructure.immutable import Immutable
from renku.infrastructure.repository import Repository
from renku.version import __version__, version_url

[docs]class Association: """Assign responsibility to an agent for an activity.""" def __init__(self, *, agent: Union[Person, SoftwareAgent], id: str, plan: Union[Plan, WorkflowFileCompositePlan]): self.agent: Union[Person, SoftwareAgent] = agent str = id self.plan: Union[Plan, WorkflowFileCompositePlan] = plan
[docs] @staticmethod def generate_id(activity_id: str) -> str: """Generate a Association identifier.""" return f"{activity_id}/association"
[docs]class Usage(Immutable): """Represent a dependent path.""" __slots__ = ("entity",) entity: Union[Collection, Entity] id: str def __init__(self, *, entity: Union[Collection, Entity], id: str): super().__init__(entity=entity, id=id)
[docs] @staticmethod def generate_id(activity_id: str) -> str: """Generate a Usage identifier.""" return f"{activity_id}/usages/{uuid4().hex}"
[docs]class HiddenUsage(Usage): """Represent a dependent path corresponding to a ``HiddenInput``."""
[docs]class Generation(Immutable): """Represent an act of generating a path.""" __slots__ = ("entity",) entity: Union[Collection, Entity] id: str def __init__(self, *, entity: Union[Collection, Entity], id: str): super().__init__(entity=entity, id=id)
[docs] @staticmethod def generate_id(activity_id: str) -> str: """Generate a Generation identifier.""" return f"{activity_id}/generations/{uuid4().hex}"
[docs]class Activity(Persistent): """Represent an activity in the repository.""" invalidated_at: Optional[datetime] = None hidden_usages: List[HiddenUsage] = list() @deal.ensure(lambda self, *_, result, **kwargs: self.ended_at_time >= self.started_at_time) def __init__( self, *, agents: List[Union[Person, SoftwareAgent]], annotations: Optional[List[Annotation]] = None, association: Association, ended_at_time: datetime, generations: Optional[List[Generation]] = None, hidden_usages: Optional[List[HiddenUsage]] = None, id: str, invalidated_at: Optional[datetime] = None, invalidations: Optional[List[Entity]] = None, parameters: Optional[List[ParameterValue]] = None, project_id: Optional[str] = None, started_at_time: datetime, usages: Optional[List[Usage]] = None, ): self.agents: List[Union[Person, SoftwareAgent]] = agents self.annotations: List[Annotation] = annotations or [] self.association: Association = association self.ended_at_time: datetime = ended_at_time self.generations: List[Generation] = generations or [] self.hidden_usages: List[HiddenUsage] = hidden_usages or [] str = id self.invalidated_at: Optional[datetime] = invalidated_at self.invalidations: List[Entity] = invalidations or [] self.parameters: List[ParameterValue] = parameters or [] self.project_id: Optional[str] = project_id self.started_at_time: datetime = started_at_time self.usages: List[Usage] = usages or [] if not"/activities/"): = f"/activities/{}" # TODO: _was_informed_by = attr.ib(kw_only=True) # TODO: influenced = attr.ib(kw_only=True)
[docs] @classmethod activity: activity is None or activity.ended_at_time >= activity.started_at_time) # NOTE: check for none due to bug in deal thinking inner function return value is actual return @inject.autoparams("project_gateway") def from_plan( cls, plan: Plan, repository: "Repository", project_gateway: IProjectGateway, started_at_time: datetime, ended_at_time: datetime, annotations: Optional[List[Annotation]] = None, id: Optional[str] = None, ): """Convert a ``Plan`` to a ``Activity``.""" from renku.core.plugin.pluginmanager import get_plugin_manager usages: Dict[str, Usage] = {} hidden_usages: Dict[str, HiddenUsage] = {} generations = {} parameter_values = [] activity_id = id or cls.generate_id() def process_input(input, already_processed, cls, add_parameter_value): input_path = input.actual_value if add_parameter_value: parameter_values.append( ParameterValue(id=ParameterValue.generate_id(activity_id),, value=input_path) ) if input_path in already_processed: return entity = get_entity_from_revision(repository=repository, path=input_path, bypass_cache=True) dependency = cls(entity=entity, id=cls.generate_id(activity_id)) already_processed[input_path] = dependency for input in plan.inputs: process_input(input=input, already_processed=usages, cls=Usage, add_parameter_value=True) for input in plan.hidden_inputs: process_input(input=input, already_processed=hidden_usages, cls=HiddenUsage, add_parameter_value=False) for output in plan.outputs: output_path = output.actual_value parameter_values.append( ParameterValue(id=ParameterValue.generate_id(activity_id),, value=output_path) ) if output_path in generations: continue entity = get_entity_from_revision(repository=repository, path=output_path, bypass_cache=True) generation = Generation(entity=entity, id=Generation.generate_id(activity_id)) generations[output_path] = generation for parameter in plan.parameters: value = parameter.actual_value parameter_values.append( ParameterValue(id=ParameterValue.generate_id(activity_id),, value=value) ) agent = SoftwareAgent(id=version_url, name=f"renku {__version__}") person = cast(Person, get_git_user(repository)) association = Association(agent=agent, id=Association.generate_id(activity_id), plan=plan) activity = cls( id=activity_id, association=association, agents=[agent, person], usages=list(usages.values()), hidden_usages=list(hidden_usages.values()), generations=list(generations.values()), parameters=parameter_values, project_id=project_gateway.get_project().id, started_at_time=started_at_time, ended_at_time=ended_at_time, annotations=annotations, ) pm = get_plugin_manager() plugin_annotations = list(chain.from_iterable(pm.hook.activity_annotations(activity=activity))) if plugin_annotations: activity.annotations.extend(plugin_annotations) return activity
def __repr__(self): return f"<Activity '{}': {} @ {self.ended_at_time}>"
[docs] @cached_property def plan_with_values(self) -> Plan: """Get a copy of the associated plan with values from ParameterValues applied.""" plan = self.association.plan.copy() for parameter in self.parameters: parameter.apply_value_to_parameter(plan) return plan
@property def deleted(self) -> bool: """Return if the activity was deleted.""" return self.invalidated_at is not None
[docs] @staticmethod def generate_id(uuid: Optional[str] = None) -> str: """Generate an identifier for an activity.""" if uuid is None: uuid = uuid4().hex return f"/activities/{uuid}"
[docs] def has_identical_inputs_and_outputs_as(self, other: "Activity"): """Return true if all input and outputs paths are identical regardless of the order.""" return sorted(u.entity.path for u in self.usages) == sorted(u.entity.path for u in other.usages) and sorted( g.entity.path for g in self.generations ) == sorted(g.entity.path for g in other.generations)
[docs] def compare_to(self, other: "Activity") -> int: """Compare execution date with another activity; return a positive value if self is executed after the other.""" if self.ended_at_time < other.ended_at_time: return -1 elif self.ended_at_time > other.ended_at_time: return 1 elif self.started_at_time < other.started_at_time: return -1 elif self.started_at_time > other.started_at_time: return 1 return 0
[docs] @deal.ensure(lambda self, *_, result, **kwargs: self.invalidated_at >= self.ended_at_time) def delete(self, when: Optional[datetime] = None): """Mark the activity as deleted.""" self.unfreeze() self.invalidated_at = when or local_now() self.freeze()
[docs]class ActivityCollection(Persistent): """Represent a list of activities.""" def __init__(self, *, activities: List[Activity], id: Optional[str] = None): self.activities: List[Activity] = activities or [] str = id or self.generate_id()
[docs] @staticmethod def generate_id() -> str: """Generate an identifier for an activity collection.""" return f"/activity-collection/{uuid4().hex}"
[docs]class WorkflowFileActivityCollection(ActivityCollection): """Represent activities of a workflow file execution.""" def __init__( self, *, activities: List[Activity], agents: List[Union[Person, SoftwareAgent]], association: Association, ended_at_time: datetime, id: Optional[str] = None, invalidated_at: Optional[datetime] = None, project_id: Optional[str] = None, started_at_time: datetime, ): super().__init__(activities=activities, id=id) self.agents: List[Union[Person, SoftwareAgent]] = agents self.association: Association = association self.ended_at_time: datetime = ended_at_time self.invalidated_at: Optional[datetime] = invalidated_at self.project_id: Optional[str] = project_id self.started_at_time: datetime = started_at_time
[docs] @classmethod @inject.autoparams("project_gateway") def from_activities( cls, plan: WorkflowFileCompositePlan, project_gateway: IProjectGateway, activities: List[Activity] ): """Create an instance from a list of ``Activity``.""" id = cls.generate_id() association = Association(agent=activities[0].association.agent, id=Association.generate_id(id), plan=plan) return cls( activities=activities, agents=activities[0].agents.copy(), association=association, ended_at_time=max(a.ended_at_time for a in activities), id=id, invalidated_at=None, project_id=project_gateway.get_project().id, started_at_time=min(a.started_at_time for a in activities), )
[docs] @staticmethod def generate_id() -> str: """Generate an identifier.""" return f"/workflow-file-activity-collection/{uuid4().hex}"
def __repr__(self): return f"<WorkflowFileActivityCollection '{}': {} @ {self.ended_at_time}>"