Add more tests for sensor#1892
Conversation
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #1892 +/- ##
===========================================
- Coverage 94.97% 55.08% -39.90%
===========================================
Files 135 301 +166
Lines 6115 22449 +16334
Branches 0 3370 +3370
===========================================
+ Hits 5808 12366 +6558
- Misses 307 9918 +9611
- Partials 0 165 +165 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Kevin Su <pingsutw@apache.org>
|
@pingsutw can you point me to a minimal repro for this? want to dig a bit deeper to understand what's happening. |
from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceStub
from flyteidl.admin import agent_pb2
import tempfile
from flytekit.sensor import FileSensor
from flytekit.sensor.base_sensor import SENSOR_MODULE, SENSOR_NAME
from datetime import timedelta
import grpc
import flytekit.models.interface as interface_models
from flytekit.models import literals, task, types
from flytekit.models.core.identifier import Identifier, ResourceType
from flytekit.models.task import TaskTemplate
def get_task_template(task_type: str) -> TaskTemplate:
task_id = Identifier(
resource_type=ResourceType.TASK, project="project", domain="domain", name="t1", version="version"
)
task_metadata = task.TaskMetadata(
True,
task.RuntimeMetadata(task.RuntimeMetadata.RuntimeType.FLYTE_SDK, "1.0.0", "python"),
timedelta(days=1),
literals.RetryStrategy(3),
True,
"0.1.1b0",
"This is deprecated!",
True,
"A",
)
interfaces = interface_models.TypedInterface(
{
"a": interface_models.Variable(types.LiteralType(types.SimpleType.INTEGER), "description1"),
},
{},
)
return TaskTemplate(
id=task_id,
metadata=task_metadata,
interface=interfaces,
type=task_type,
custom={},
)
channel = grpc.insecure_channel('localhost:8000')
stub = AsyncAgentServiceStub(channel)
interfaces = interface_models.TypedInterface(
{
"path": interface_models.Variable(types.LiteralType(types.SimpleType.STRING), "description1"),
},
{},
)
tmp = get_task_template("sensor")
tmp._custom = {
SENSOR_MODULE: FileSensor.__module__,
SENSOR_NAME: FileSensor.__name__,
}
file = tempfile.NamedTemporaryFile()
tmp._interface = interfaces
task_inputs = literals.LiteralMap(
{
"path": literals.Literal(scalar=literals.Scalar(primitive=literals.Primitive(string_value=file.name))),
},
)
if __name__ == '__main__':
try:
for i in range(1):
res = stub.CreateTask(request=agent_pb2.CreateTaskRequest(template=tmp.to_flyte_idl(), inputs=task_inputs.to_flyte_idl(), output_prefix="/tmp"))
res = stub.GetTask(request=agent_pb2.GetTaskRequest(task_type="sensor", resource_meta=res.resource_meta))
print(res)
except Exception as e:
print("error:", e) |
|
or register a dynamic task, and run it. import sys
from typing import List
from flytekit import task, workflow, dynamic, Resources
from flytekit.remote import *
from flytekit.configuration import Config
@task(requests=Resources(mem="500Mi"))
def add1(x: int) -> int:
return x + 1
@dynamic(requests=Resources(mem="500Mi"))
def q(n: int) -> int:
ctr = 0
for _ in range(n):
ctr = add1(x=ctr)
return ctr
@task
def fin(x: int):
print(x)
@workflow
def workflow():
fin(x=q(n=5)) |
Signed-off-by: Kevin Su <pingsutw@apache.org>
|
Find the same issue in the map task. you can reproduce it by running below workflow import typing
import pandas as pd
from click.testing import CliRunner
from flytekit import task, workflow, ImageSpec, map_task
from flytekit.clis.sdk_in_container import pyflyte
from flytekit.deck.renderer import TopFrameRenderer
from typing_extensions import Annotated
new_flytekit = "git+https://github.com/flyteorg/flytekit.git@542d63b0de22ad8de77bd20d4d3e7e246f7ced37"
image_spec = ImageSpec(base_image="python:3.8-slim-buster", packages=[new_flytekit, "pandas", "pillow"], apt_packages=["git"], registry="pingsutw", name="flytekit1")
@task(enable_deck=True, container_image=image_spec)
def a_mappable_task(a: int) -> str:
inc = a + 2
stringified = str(inc)
return stringified
@task(enable_deck=True, container_image=image_spec)
def t1() -> (Annotated[pd.DataFrame, TopFrameRenderer()], int):
return pd.DataFrame({"col1": [1, 2, 3], "col2": list("abc")}), 3
@workflow
def wf(a: typing.List[int] = [1, 2, 3]):
t1()
map_task(a_mappable_task)(a=a)
if __name__ == '__main__':
runner = CliRunner()
result = runner.invoke(pyflyte.main, ["run", "--remote", "flyte-example/deck_example.py", "wf"])
print(result.output) |
| return None, None | ||
| # if the remote_deploy command is invoked in the same module as where | ||
| # the app is defined, get the module from the file name | ||
| mod = InstanceTrackingMeta._get_module_from_main(frame.f_globals) |
There was a problem hiding this comment.
cc @cosmicBboy is there way to get_module_from_main only when running eager workflow?
There was a problem hiding this comment.
The thing is the eager workflow changes to tracker.py also addresses the more generic case of trying to register/run tasks via FlyteRemote, where the tasks are defined in the same module as the remote calls, e.g.:
@task
def t1(): ...
remote = FlyteRemote(...)
# this didn't work before
remote.register(...)
remote.execute(t1)| if frame.f_code.co_name == "<module>" and "__name__" in frame.f_globals: | ||
| if frame.f_globals["__name__"] != "__main__": | ||
| return frame.f_globals["__name__"], frame.f_globals["__file__"] | ||
| if frame.f_globals["__file__"].endswith("pyflyte"): |
There was a problem hiding this comment.
let's create a tuple of the executables we want to exclude.
There was a problem hiding this comment.
I just removed the changes from tracker.py. Now, This PR only adds the tests for the agent.
Signed-off-by: Kevin Su <pingsutw@apache.org>
|
note: we have fixed the issue in #1902 |
eapolinario
left a comment
There was a problem hiding this comment.
thank you. This will help a lot in the followup PR.
Signed-off-by: Kevin Su <pingsutw@apache.org>

TL;DR
Failed to run the sensor in the agent due to following error.
{"asctime": "2023-10-14 10:58:59,300", "name": "flytekit", "levelname": "ERROR", "message": "task_type: \"sensor\"\nresource_meta: \"\\200\\005\\225l\\000\\000\\000\\000\\000\\000\\000}\\224(\\214\\rsensor_module\\224\\214\\033flytekit.sensor.file_sensor\\224\\214\\013sensor_name\\224\\214\\nFileSensor\\224\\214\\006inputs\\224}\\224\\214\\004path\\224\\214\\010/tmp/123\\224su.\"\n"} {"asctime": "2023-10-14 10:58:59,303", "name": "flytekit", "levelname": "ERROR", "message": "Importing module usr.local.bin.pyflyte from file /usr/local/bin/pyflyte"}Previously, it worked, but this PR breaks it.
We shouldn't import usr.local.bin.pyflyte since it's not even a module.
Type
Are all requirements met?
Complete description
I've tested it in the flytectl demo cluster
Tracking Issue
flyteorg/flyte#3936
flyteorg/flyte#4245
Follow-up issue
NA