Skip to content

JSON IDL#2600

Closed
Future-Outlier wants to merge 36 commits into
masterfrom
json-idl
Closed

JSON IDL#2600
Future-Outlier wants to merge 36 commits into
masterfrom
json-idl

Conversation

@Future-Outlier

@Future-Outlier Future-Outlier commented Jul 22, 2024

Copy link
Copy Markdown
Member

Tracking issue

flyteorg/flyte#5318

Why are the changes needed?

What changes were proposed in this pull request?

  • type transformer tries to convert to JSON first, then Struct
  • backward compatibility
  • attribute access for local execution

How was this patch tested?

local execution, remote execution and unit tests.
Note: pydantic Transformer will be supported in the follow-up PR.

Tips for collapsed sections

dataclass transformer

from dataclasses import dataclass, fields
from typing import Dict
from flytekit.types.file import FlyteFile

from flytekit import task, workflow, ImageSpec

flytekit_hash = "a964d7d5461fd79f3f8442c433825bcba830f32d"
flyteidl_hash = "0b32845e92cfe354c197ddf5ae3c8605a17f079b"

json_idl = f"git+https://github.com/flyteorg/flyte.git@{flyteidl_hash}#subdirectory=flyteidl"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[json_idl, flytekit],
    apt_packages=["git"],
    registry="futureoutlier",
)

@dataclass
class DC_inside:
    a: int
    b: float
    # ff: FlyteFile

@dataclass
class DC:
    a: int
    b: float
    c: str
    d: Dict[str, int]
    e: DC_inside


@task(container_image=image)
def dataclass_task(input: DC) -> DC:
    def print_dataclass_info(instance, indent=0):
        for field in fields(instance):
            value = getattr(instance, field.name)
            print(f"{' ' * indent}Field: {field.name}, Type: {field.type}, Value: {value}")
            if isinstance(value, dict):
                for k, v in value.items():
                    print(f"{' ' * (indent + 4)} Type: {type(k)}, Key: {k}")
                    print(f"{' ' * (indent + 4)} Type: {type(v)}, Value: {v}")

    print("DC fields:")
    print_dataclass_info(input)

    return input

@task(container_image=image)
def dataclass_attr_dict_task(a: float, d: Dict[str, int]):
    print("DC Attr:")
    print("a:", type(a), a)
    print("d:", type(d), d)
    for k, v in d.items():
        print(type(k), k)
        print(type(v), v)

@task(container_image=image)
def dataclass_attr_dc_task(dc: DC_inside):
    print("DC Attr:")
    print("DC Attr:")
    print("a:", type(dc.a), dc.a)
    print("b:", type(dc.b), dc.b)

@task(container_image=image)
def dataclass_attr_dc_int_task(a: int):
    print("DC Attr:")
    print("a:", type(a), a)

@task(container_image=image)
def dataclass_attr_dc_float_task(b: float):
    print("DC Attr:")
    print("b:", type(b), b)

@task(container_image=image)
def dataclass_attr_dc_flytefile_task(ff: FlyteFile):
    print("FlyteFile DC Attr:")
    with open(ff, newline="") as nf:
        print(nf.read())

@workflow
def dataclass_wf(input: DC) -> DC:
    dataclass_attr_dict_task(a=input.b, d=input.d,)
    dataclass_attr_dc_task(dc=input.e)
    dataclass_attr_dc_int_task(a=input.e.a)
    dataclass_attr_dc_float_task(b=input.e.b)
    # dataclass_attr_dc_flytefile_task(ff=input.e.ff)
    return dataclass_task(input=input)


if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/demo/dataclass_example.py"
    result = runner.invoke(pyflyte.main, ["run", path, "dataclass_wf", "--input",
                                          '{"a": 1, "b": 3.14, "c": "example_string", "d": {"1": 100, "2": 200}, "e": {"a": 1, "b": 3.14}}'])

    print("Local Execution: ", result.output)

    result = runner.invoke(pyflyte.main, ["run", "--remote", path, "dataclass_wf", "--input",
                                          '{"a": 1, "b": 3.14, "c": "example_string", "d": {"1": 100, "2": 200}, "e": {"a": 1, "b": 3.14}}'])
    print("Remote Execution: ", result.output)

dict transformer

from flytekit import task, workflow, ImageSpec

flytekit_hash = "a964d7d5461fd79f3f8442c433825bcba830f32d"
flyteidl_hash = "0b32845e92cfe354c197ddf5ae3c8605a17f079b"

json_idl = f"git+https://github.com/flyteorg/flyte.git@{flyteidl_hash}#subdirectory=flyteidl"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[json_idl, flytekit],
    apt_packages=["git"],
    registry="futureoutlier",
)

@task(container_image=image)
def dict_task(input: dict) -> dict:
    print("Dict fields:")
    for k, v in input.items():
        print(type(k), type(v))
        print(k, v)
    return input

@workflow
def dict_wf(input: dict) -> dict:
    return dict_task(input=input)

if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/demo/dict_example.py"
    result = runner.invoke(pyflyte.main,
                    ["run",
                            path,
                            "dict_wf",
                            "--input",
                            '{"a": 1, "b": 2, "c": [1, 2, 3], "d": {"e": 1, "f": [1, 2, 3]}}'])
    print("Local Execution: ", result.output)

    result = runner.invoke(pyflyte.main,
                           ["run",
                            "--remote",
                            path,
                            "dict_wf",
                            "--input",
                            '{"a": 1, "b": 2, "c": [1, 2, 3], "d": {"e": 1, "f": [1, 2, 3]}}'])
    print("Remote Execution: ", result.output)

backward compatibility (upstream struct -> downstream json)

reference task

  1. create a reference task first
from flytekit import task

@task
def creat_reference_task_dict() -> dict:
    return {"a": 1, "b": 2}

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(creat_reference_task_dict())
    print("Done!")
  1. use a reference task with struct output
from flytekit import reference_task, workflow, task, ImageSpec

flytekit_hash = "a964d7d5461fd79f3f8442c433825bcba830f32d"
flyteidl_hash = "0b32845e92cfe354c197ddf5ae3c8605a17f079b"

json_idl = f"git+https://github.com/flyteorg/flyte.git@{flyteidl_hash}#subdirectory=flyteidl"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[flytekit, json_idl],
    apt_packages=["git"],
    registry="futureoutlier",
)

@reference_task(
    project="flytesnacks",
    domain="development",
    name="create_reference_task.creat_reference_task_dict",
    version="Jk8kDmEU24TV6CqUaBoD6w",
)
def t1(
) -> dict:
    ...

@task(container_image=image)
def t2(input: dict) -> dict:
    print(input)
    for key, value in input.items():
        print(f'Key: {key}, Value: {value}')
        print(f'Key: {type(key)}, Value: {type(value)}')
    return input

@workflow
def wf() -> dict:
    d1 = t1()
    d2 = t2(input=d1)
    return d2

if __name__ == "__main__":
    from click.testing import CliRunner
    from flytekit.clis.sdk_in_container import pyflyte

    # print(f"Running {__file__} main...")
    # print(f"Output: {wf()}")
    # print("Done!")

    runner = CliRunner()
    result = runner.invoke(pyflyte.main,
                           ["run",
                            "--remote",
                            "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/backward-compatibility/use_reference_task.py",
                            "wf"])
    print(result.output)

cache

from flytekit import reference_task, workflow, task, ImageSpec

flytekit_hash = "a964d7d5461fd79f3f8442c433825bcba830f32d"
flyteidl_hash = "0b32845e92cfe354c197ddf5ae3c8605a17f079b"

json_idl = f"git+https://github.com/flyteorg/flyte.git@{flyteidl_hash}#subdirectory=flyteidl"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[flytekit, json_idl],
    apt_packages=["git"],
    registry="futureoutlier",
)

@task(cache=True, cache_serialize=True, cache_version="1.0")
def t1(
) -> dict:
    return {"a": 1, "b": 2, "c": [1, 2, 3], "d": {"e": 1, "f": [1, 2, 3]}}

@task(container_image=image)
def t2(input: dict) -> dict:
    print(input)
    for key, value in input.items():
        print(f'Key: {key}, Value: {value}')
        print(f'Key: {type(key)}, Value: {type(value)}')
    return input

@workflow
def wf() -> dict:
    d1 = t1()
    d2 = t2(input=d1)
    return d2

if __name__ == "__main__":
    from click.testing import CliRunner
    from flytekit.clis.sdk_in_container import pyflyte

    runner = CliRunner()
    result = runner.invoke(pyflyte.main,
                           ["run",
                            "--remote",
                            "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/cache/use_cache.py",
                            "wf"])
    print(result.output)

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@eapolinario eapolinario mentioned this pull request Jul 30, 2024
3 tasks
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Comment on lines 547 to +551
lv = d.to_literal(
ctx,
{"x": datetime.datetime(2024, 5, 5)},
Annotated[dict, kwtypes(allow_pickle=True)],
LiteralType(simple=SimpleType.STRUCT),
LiteralType(simple=SimpleType.JSON),

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for pickle case.

arbaobao and others added 4 commits August 6, 2024 09:39
* Add an exeception when filters' value isn't a list

* Make the exception more specific

Signed-off-by: Nelson Chen <asd3431090@gmail.com>

* add an unit test for value_in

Signed-off-by: Nelson Chen <asd3431090@gmail.com>

* lint

Signed-off-by: Kevin Su <pingsutw@apache.org>

---------

Signed-off-by: Nelson Chen <asd3431090@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: wayner0628 <a901639@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Kevin Su <pingsutw@apache.org>
* Show different of types in dataclass when transforming error

Signed-off-by: Future-Outlier <eric901201@gmail.com>

* add tests for dataclass

Signed-off-by: Future-Outlier <eric901201@gmail.com>

* fix tests

Signed-off-by: Future-Outlier <eric901201@gmail.com>

---------

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier

Copy link
Copy Markdown
Member Author

Note: if we use _json_format.Parse(json_str, _struct.Struct()), it will convert int to float here.

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier

Future-Outlier commented Aug 26, 2024

Copy link
Copy Markdown
Member Author

Hi, @JackUrb
Can you help me take a look at my examples?
I want to make sure this feature is well-tested enough!

RFC: flyteorg/flyte#5607
flyte: flyteorg/flyte#5542

Signed-off-by: Future-Outlier <eric901201@gmail.com>
@JackUrb

JackUrb commented Aug 29, 2024

Copy link
Copy Markdown
Contributor

Hi @Future-Outlier! Here are two cases that are currently not functional on main, but I believe this change should resolve.

def test_DataclassTransformer_subattribute_wrapping():
    @dataclass
    class Base:
        a: int
        b: int

    @task
    def add_list_elems(given_ints: list[int]) -> int:
        return sum(given_ints)

    @task
    def get_base() -> Base:
        return Base(a=10, b=20)

    @workflow
    def wf() -> int:
        base = get_base()
        result = add_list_elems(given_ints=[base.a, base.b, 3])
        return result

    result = wf()
    assert result == 33


def test_DataclassTransformer_complex_subattribute_access():
    @dataclass
    class Base:
        a: list[int]
        b: list[str]

    @task
    def get_data() -> Base:
        return Base(a=[1, 2, 3, 4, 5], b=["a", "b", "c", "d", "e"])


    @task
    def list_join(in_list: list[int | str]) -> str:
        return "".join([str(i) for i in in_list])


    @task
    def list_sum(in_list: list[int]) -> int:
        return sum(in_list)


    @workflow
    def wf() -> tuple[int, str, str]:
        data = get_data()
        summed_ints = list_sum(in_list=data.a)
        joined_ints = list_join(in_list=data.a)
        joined_strs = list_join(in_list=data.b)
        return summed_ints, joined_ints, joined_strs


    result = wf()
    assert result[0] == 15
    assert result[1] == "12345"
    assert result[2] == "abcde"

@Future-Outlier

Copy link
Copy Markdown
Member Author

Hi @Future-Outlier! Here are two cases that are currently not functional on main, but I believe this change should resolve.

def test_DataclassTransformer_subattribute_wrapping():

    @dataclass

    class Base:

        a: int

        b: int



    @task

    def add_list_elems(given_ints: list[int]) -> int:

        return sum(given_ints)



    @task

    def get_base() -> Base:

        return Base(a=10, b=20)



    @workflow

    def wf() -> int:

        base = get_base()

        result = add_list_elems(given_ints=[base.a, base.b, 3])

        return result



    result = wf()

    assert result == 33





def test_DataclassTransformer_complex_subattribute_access():

    @dataclass

    class Base:

        a: list[int]

        b: list[str]



    @task

    def get_data() -> Base:

        return Base(a=[1, 2, 3, 4, 5], b=["a", "b", "c", "d", "e"])





    @task

    def list_join(in_list: list[int | str]) -> str:

        return "".join([str(i) for i in in_list])





    @task

    def list_sum(in_list: list[int]) -> int:

        return sum(in_list)





    @workflow

    def wf() -> tuple[int, str, str]:

        data = get_data()

        summed_ints = list_sum(in_list=data.a)

        joined_ints = list_join(in_list=data.a)

        joined_strs = list_join(in_list=data.b)

        return summed_ints, joined_ints, joined_strs





    result = wf()

    assert result[0] == 15

    assert result[1] == "12345"

    assert result[2] == "abcde"

Let me take a look tomorrow, thank you <3

Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier

Copy link
Copy Markdown
Member Author

Hi @Future-Outlier! Here are two cases that are currently not functional on main, but I believe this change should resolve.

def test_DataclassTransformer_subattribute_wrapping():
    @dataclass
    class Base:
        a: int
        b: int

    @task
    def add_list_elems(given_ints: list[int]) -> int:
        return sum(given_ints)

    @task
    def get_base() -> Base:
        return Base(a=10, b=20)

    @workflow
    def wf() -> int:
        base = get_base()
        result = add_list_elems(given_ints=[base.a, base.b, 3])
        return result

    result = wf()
    assert result == 33


def test_DataclassTransformer_complex_subattribute_access():
    @dataclass
    class Base:
        a: list[int]
        b: list[str]

    @task
    def get_data() -> Base:
        return Base(a=[1, 2, 3, 4, 5], b=["a", "b", "c", "d", "e"])


    @task
    def list_join(in_list: list[int | str]) -> str:
        return "".join([str(i) for i in in_list])


    @task
    def list_sum(in_list: list[int]) -> int:
        return sum(in_list)


    @workflow
    def wf() -> tuple[int, str, str]:
        data = get_data()
        summed_ints = list_sum(in_list=data.a)
        joined_ints = list_join(in_list=data.a)
        joined_strs = list_join(in_list=data.b)
        return summed_ints, joined_ints, joined_strs


    result = wf()
    assert result[0] == 15
    assert result[1] == "12345"
    assert result[2] == "abcde"

for the case 1, it works remotely and I'm fixing local execution.

image

Man, you are AMAZING, I am really lucky have you <3.

Signed-off-by: Future-Outlier <eric901201@gmail.com>

@eapolinario eapolinario left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty good. I only have a question about double-encoding json objects.

Comment thread flytekit/core/type_engine.py Outdated
Comment on lines +499 to +500
json_str = json.dumps(python_val)
return Literal(scalar=Scalar(generic=_json_format.Parse(json_str, _struct.Struct())))
json_bytes = msgpack.dumps(json_str)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to encode json objects to a string and then use msgpack to serialize the json-encoded string?

In other words, why can't we msgpack-encode python_val directly? As in msgpack.dumps(python_val)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I think it's because the Mashumaro JSON encoder (or available to_json method) is more feature rich than msgpack.

That being said, this could probably use a to_dict -> msgpack.dumps and msgpack.reads -> from_dict flow for the data classes that need it (extending the mashumaro mix in), instead of to_json/from_json. The rest of them can use mashumaro's msgpack encoder.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, @JackUrb you are right, I am doing performance testing for alternatives.

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@codecov

codecov Bot commented Sep 4, 2024

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 31.85185% with 92 lines in your changes missing coverage. Please review.

Project coverage is 47.23%. Comparing base (8bad8e6) to head (86c24ae).
Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/type_engine.py 27.95% 61 Missing and 6 partials ⚠️
flytekit/core/promise.py 13.04% 19 Missing and 1 partial ⚠️
flytekit/interaction/string_literals.py 0.00% 2 Missing ⚠️
flytekit/models/literals.py 86.66% 2 Missing ⚠️
flytekit/interaction/click_types.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2600      +/-   ##
==========================================
- Coverage   49.76%   47.23%   -2.53%     
==========================================
  Files         193      193              
  Lines       19659    19776     +117     
  Branches     4097     4121      +24     
==========================================
- Hits         9783     9341     -442     
- Misses       9345     9932     +587     
+ Partials      531      503      -28     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier

Copy link
Copy Markdown
Member Author

use metadata, foramt->msgpack, json-type->ejson, pickle don't need json type

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Comment on lines +393 to +410
from flytekit import StructuredDataset
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.schema.types import FlyteSchema

FLYTE_TYPES = [FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema]

for k, v in original_dict.items():
if k in expected_fields_dict:
if isinstance(v, dict):
self.assert_type(expected_fields_dict[k], v)
# todo: 1. check if expected_fields_dict[k] is a flyte type, if yes, then use v to construct a flyet types and assert them
expected_type = expected_fields_dict[k]
if expected_type in FLYTE_TYPES:
new_v = copy.deepcopy(v)
new_v = expected_type(**new_v)
self.assert_type(expected_fields_dict[k], new_v)
else:
self.assert_type(expected_fields_dict[k], v)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for accessing a data class with FLYTE_TYPES.

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier

Copy link
Copy Markdown
Member Author

TODO: test it with 2 level usecase

@Future-Outlier

Copy link
Copy Markdown
Member Author

Move to flyteorg/flyte#5742

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants