Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,12 +676,11 @@ def get_node_execution(self, node_execution_identifier):
)
)

def get_node_execution_data(self, node_execution_identifier):
def get_node_execution_data(self, node_execution_identifier) -> _execution.NodeExecutionGetDataResponse:
"""
Returns signed URLs to LiteralMap blobs for a node execution's inputs and outputs (when available).

:param flytekit.models.core.identifier.NodeExecutionIdentifier node_execution_identifier:
:rtype: flytekit.models.execution.NodeExecutionGetDataResponse
"""
return _execution.NodeExecutionGetDataResponse.from_flyte_idl(
super(SynchronousFlyteClient, self).get_node_execution_data(
Expand Down
75 changes: 75 additions & 0 deletions flytekit/models/core/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from flyteidl.core import catalog_pb2

from flytekit.models import common as _common_models
from flytekit.models.core import identifier as _identifier


class CatalogArtifactTag(_common_models.FlyteIdlEntity):
def __init__(self, artifact_id: str, name: str):
self._artifact_id = artifact_id
self._name = name

@property
def artifact_id(self) -> str:
return self._artifact_id

@property
def name(self) -> str:
return self._name

def to_flyte_idl(self) -> catalog_pb2.CatalogArtifactTag:
return catalog_pb2.CatalogArtifactTag(artifact_id=self.artifact_id, name=self.name)

@classmethod
def from_flyte_idl(cls, p: catalog_pb2.CatalogArtifactTag) -> "CatalogArtifactTag":
return cls(
artifact_id=p.artifact_id,
name=p.name,
)


class CatalogMetadata(_common_models.FlyteIdlEntity):
def __init__(
self,
dataset_id: _identifier.Identifier,
artifact_tag: CatalogArtifactTag,
source_task_execution: _identifier.TaskExecutionIdentifier,
):
self._dataset_id = dataset_id
self._artifact_tag = artifact_tag
self._source_task_execution = source_task_execution

@property
def dataset_id(self) -> _identifier.Identifier:
return self._dataset_id

@property
def artifact_tag(self) -> CatalogArtifactTag:
return self._artifact_tag

@property
def source_task_execution(self) -> _identifier.TaskExecutionIdentifier:
return self._source_task_execution

@property
def source_execution(self) -> _identifier.TaskExecutionIdentifier:
"""
This is a one of but for now there's only one thing in the one of
"""
return self._source_task_execution

def to_flyte_idl(self) -> catalog_pb2.CatalogMetadata:
return catalog_pb2.CatalogMetadata(
dataset_id=self.dataset_id.to_flyte_idl(),
artifact_tag=self.artifact_tag.to_flyte_idl(),
source_task_execution=self.source_task_execution.to_flyte_idl(),
)

@classmethod
def from_flyte_idl(cls, pb: catalog_pb2.CatalogMetadata) -> "CatalogMetadata":
return cls(
dataset_id=_identifier.Identifier.from_flyte_idl(pb.dataset_id),
artifact_tag=CatalogArtifactTag.from_flyte_idl(pb.artifact_tag),
# Add HasField check if more things are ever added to the one of
source_task_execution=_identifier.TaskExecutionIdentifier.from_flyte_idl(pb.source_task_execution),
)
24 changes: 19 additions & 5 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import typing

import flyteidl.admin.execution_pb2 as _execution_pb2
import flyteidl.admin.node_execution_pb2 as _node_execution_pb2
import flyteidl.admin.task_execution_pb2 as _task_execution_pb2
Expand All @@ -7,6 +9,7 @@
from flytekit.models import literals as _literals_models
from flytekit.models.core import execution as _core_execution
from flytekit.models.core import identifier as _identifier
from flytekit.models.node_execution import DynamicWorkflowNodeMetadata


class ExecutionMetadata(_common_models.FlyteIdlEntity):
Expand Down Expand Up @@ -238,7 +241,6 @@ class Execution(_common_models.FlyteIdlEntity):
def __init__(self, id, spec, closure):
"""
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier id:
:param Text id:
:param ExecutionSpec spec:
:param ExecutionClosure closure:
"""
Expand Down Expand Up @@ -403,8 +405,8 @@ def __init__(self, inputs, outputs, full_inputs, full_outputs):
"""
:param _common_models.UrlBlob inputs:
:param _common_models.UrlBlob outputs:
:param _literals_pb2.LiteralMap full_inputs:
:param _literals_pb2.LiteralMap full_outputs:
:param _literals_models.LiteralMap full_inputs:
:param _literals_models.LiteralMap full_outputs:
"""
self._inputs = inputs
self._outputs = outputs
Expand All @@ -428,14 +430,14 @@ def outputs(self):
@property
def full_inputs(self):
"""
:rtype: _literals_pb2.LiteralMap
:rtype: _literals_models.LiteralMap
"""
return self._full_inputs

@property
def full_outputs(self):
"""
:rtype: _literals_pb2.LiteralMap
:rtype: _literals_models.LiteralMap
"""
return self._full_outputs

Expand Down Expand Up @@ -493,6 +495,14 @@ def to_flyte_idl(self):


class NodeExecutionGetDataResponse(_CommonDataResponse):
def __init__(self, *args, dynamic_workflow: typing.Optional[DynamicWorkflowNodeMetadata] = None, **kwargs):
super().__init__(*args, **kwargs)
self._dynamic_workflow = dynamic_workflow

@property
def dynamic_workflow(self) -> typing.Optional[DynamicWorkflowNodeMetadata]:
return self._dynamic_workflow

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
Expand All @@ -504,6 +514,9 @@ def from_flyte_idl(cls, pb2_object):
outputs=_common_models.UrlBlob.from_flyte_idl(pb2_object.outputs),
full_inputs=_literals_models.LiteralMap.from_flyte_idl(pb2_object.full_inputs),
full_outputs=_literals_models.LiteralMap.from_flyte_idl(pb2_object.full_outputs),
dynamic_workflow=DynamicWorkflowNodeMetadata.from_flyte_idl(pb2_object.dynamic_workflow)
if pb2_object.HasField("dynamic_workflow")
else None,
)

def to_flyte_idl(self):
Expand All @@ -515,4 +528,5 @@ def to_flyte_idl(self):
outputs=self.outputs.to_flyte_idl(),
full_inputs=self.full_inputs.to_flyte_idl(),
full_outputs=self.full_outputs.to_flyte_idl(),
dynamic_workflow=self.dynamic_workflow.to_flyte_idl() if self.dynamic_workflow else None,
)
115 changes: 114 additions & 1 deletion flytekit/models/node_execution.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,101 @@
import typing

import flyteidl.admin.node_execution_pb2 as _node_execution_pb2
import pytz as _pytz

from flytekit.models import common as _common_models
from flytekit.models.core import catalog as catalog_models
from flytekit.models.core import compiler as core_compiler_models
from flytekit.models.core import execution as _core_execution
from flytekit.models.core import identifier as _identifier


class WorkflowNodeMetadata(_common_models.FlyteIdlEntity):
def __init__(self, execution_id: _identifier.WorkflowExecutionIdentifier):
self._execution_id = execution_id

@property
def execution_id(self) -> _identifier.WorkflowExecutionIdentifier:
return self._execution_id

def to_flyte_idl(self) -> _node_execution_pb2.WorkflowNodeMetadata:
return _node_execution_pb2.WorkflowNodeMetadata(
executionId=self.execution_id.to_flyte_idl(),
)

@classmethod
def from_flyte_idl(cls, p: _node_execution_pb2.WorkflowNodeMetadata) -> "WorkflowNodeMetadata":
return cls(
execution_id=_identifier.WorkflowExecutionIdentifier.from_flyte_idl(p.executionId),
)


class DynamicWorkflowNodeMetadata(_common_models.FlyteIdlEntity):
def __init__(self, id: _identifier.Identifier, compiled_workflow: core_compiler_models.CompiledWorkflowClosure):
self._id = id
self._compiled_workflow = compiled_workflow

@property
def id(self) -> _identifier.Identifier:
return self._id

@property
def compiled_workflow(self) -> core_compiler_models.CompiledWorkflowClosure:
return self._compiled_workflow

def to_flyte_idl(self) -> _node_execution_pb2.DynamicWorkflowNodeMetadata:
return _node_execution_pb2.DynamicWorkflowNodeMetadata(
id=self.id.to_flyte_idl(),
compiled_workflow=self.compiled_workflow.to_flyte_idl(),
)

@classmethod
def from_flyte_idl(cls, p: _node_execution_pb2.DynamicWorkflowNodeMetadata) -> "DynamicWorkflowNodeMetadata":
yy = cls(
id=_identifier.Identifier.from_flyte_idl(p.id),
compiled_workflow=core_compiler_models.CompiledWorkflowClosure.from_flyte_idl(p.compiled_workflow),
)
return yy


class TaskNodeMetadata(_common_models.FlyteIdlEntity):
def __init__(self, cache_status: int, catalog_key: catalog_models.CatalogMetadata):
self._cache_status = cache_status
self._catalog_key = catalog_key

@property
def cache_status(self) -> int:
return self._cache_status

@property
def catalog_key(self) -> catalog_models.CatalogMetadata:
return self._catalog_key

def to_flyte_idl(self) -> _node_execution_pb2.TaskNodeMetadata:
return _node_execution_pb2.TaskNodeMetadata(
cache_status=self.cache_status,
catalog_key=self.catalog_key.to_flyte_idl(),
)

@classmethod
def from_flyte_idl(cls, p: _node_execution_pb2.TaskNodeMetadata) -> "TaskNodeMetadata":
return cls(
cache_status=p.cache_status,
catalog_key=catalog_models.CatalogMetadata.from_flyte_idl(p.catalog_key),
)


class NodeExecutionClosure(_common_models.FlyteIdlEntity):
def __init__(self, phase, started_at, duration, output_uri=None, error=None):
def __init__(
self,
phase,
started_at,
duration,
output_uri=None,
error=None,
workflow_node_metadata: typing.Optional[WorkflowNodeMetadata] = None,
task_node_metadata: typing.Optional[TaskNodeMetadata] = None,
):
"""
:param int phase:
:param datetime.datetime started_at:
Expand All @@ -20,6 +108,9 @@ def __init__(self, phase, started_at, duration, output_uri=None, error=None):
self._duration = duration
self._output_uri = output_uri
self._error = error
self._workflow_node_metadata = workflow_node_metadata
self._task_node_metadata = task_node_metadata
# TODO: Add output_data field as well.

@property
def phase(self):
Expand Down Expand Up @@ -56,6 +147,18 @@ def error(self):
"""
return self._error

@property
def workflow_node_metadata(self) -> typing.Optional[WorkflowNodeMetadata]:
return self._workflow_node_metadata

@property
def task_node_metadata(self) -> typing.Optional[TaskNodeMetadata]:
return self._task_node_metadata

@property
def target_metadata(self) -> typing.Union[WorkflowNodeMetadata, TaskNodeMetadata]:
return self.workflow_node_metadata or self.task_node_metadata

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.node_execution_pb2.NodeExecutionClosure
Expand All @@ -64,6 +167,10 @@ def to_flyte_idl(self):
phase=self.phase,
output_uri=self.output_uri,
error=self.error.to_flyte_idl() if self.error is not None else None,
workflow_node_metadata=self.workflow_node_metadata.to_flyte_idl()
if self.workflow_node_metadata is not None
else None,
task_node_metadata=self.task_node_metadata.to_flyte_idl() if self.task_node_metadata is not None else None,
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
Expand All @@ -81,6 +188,12 @@ def from_flyte_idl(cls, p):
error=_core_execution.ExecutionError.from_flyte_idl(p.error) if p.HasField("error") else None,
started_at=p.started_at.ToDatetime().replace(tzinfo=_pytz.UTC),
duration=p.duration.ToTimedelta(),
workflow_node_metadata=WorkflowNodeMetadata.from_flyte_idl(p.workflow_node_metadata)
if p.HasField("workflow_node_metadata")
else None,
task_node_metadata=TaskNodeMetadata.from_flyte_idl(p.task_node_metadata)
if p.HasField("task_node_metadata")
else None,
)


Expand Down
7 changes: 3 additions & 4 deletions flytekit/remote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@
"""

from flytekit.remote.component_nodes import FlyteTaskNode, FlyteWorkflowNode
from flytekit.remote.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution
from flytekit.remote.launch_plan import FlyteLaunchPlan
from flytekit.remote.nodes import FlyteNode, FlyteNodeExecution
from flytekit.remote.nodes import FlyteNode
from flytekit.remote.remote import FlyteRemote
from flytekit.remote.tasks.executions import FlyteTaskExecution
from flytekit.remote.tasks.task import FlyteTask
from flytekit.remote.task import FlyteTask
from flytekit.remote.workflow import FlyteWorkflow
from flytekit.remote.workflow_execution import FlyteWorkflowExecution
Loading