Skip to content

[Core feature] Allow dictionaries to be passed to execute workflows that take dataclasses#2013

Merged
pingsutw merged 13 commits into
flyteorg:masterfrom
jasonlai1218:dataclass-execution-fix
Dec 22, 2023
Merged

[Core feature] Allow dictionaries to be passed to execute workflows that take dataclasses#2013
pingsutw merged 13 commits into
flyteorg:masterfrom
jasonlai1218:dataclass-execution-fix

Conversation

@jasonlai1218

@jasonlai1218 jasonlai1218 commented Nov 30, 2023

Copy link
Copy Markdown
Contributor

Tracking issue

flyteorg/flyte#4098

Docs link

Describe your changes

  • Import the json module
  • Add a check for dictionary types in the DataclassTransformer class
  • Raise a TypeTransformerFailedError if a key in the dictionary does not match any field in the dataclass
  • Convert the dictionary to a JSON string using json.dumps()

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Setup Process

  1. write a data_class2.py
import typing
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from flytekit import workflow, task


@dataclass_json
@dataclass
class Data:
    a: int = 10
    b: str = "default_b"
    c: typing.Optional[str] = "default_c"


@task
def concat(x: list[Data]):
    for data in x:
        print(f"{data.a}_{data.b}_{data.c}")


@workflow
def my_wf(x: list[Data]) -> int:
    concat(x=x)
    return 0
  1. register data_class2.py
    execute cmd: pyflyte register -p flytesnacks --domain development --version 4 data_class2.py

  2. write a remote_wf2.py

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


if __name__ == "__main__":
    # FlyteRemote object is the main entrypoint to API
    remote = FlyteRemote(
        config=Config.for_endpoint(endpoint="localhost:30080", insecure=True),
        default_project="flytesnacks",
        default_domain="development",
    )

    # Get Task
    flyte_workflow = remote.fetch_workflow(name="data_class2.my_wf", version="4")

    # Run Task
    execution = remote.execute(
        flyte_workflow,
        inputs={'x': [{"a": 10, "b": "hello"}, {"a": 100, "b": "hello2"}]}
    )

and execute the program

Screenshots

Successful screenshot

Screenshot 2023-12-01 at 1 23 55 PM
Screenshot 2023-12-01 at 1 24 02 PM

FlyteFile

  1. write a flytefile_data_class.py
import csv
import os
from collections import defaultdict
from dataclasses import dataclass
from typing import List

from dataclasses_json import dataclass_json

import flytekit
from flytekit import task, workflow
from flytekit.types.file import FlyteFile


@dataclass_json
@dataclass
class Data:
    csv_url: FlyteFile
    columns_to_normalize: List[str]
    column_names: List[str]
    output_location: str


@task
def normalize_columns(data: Data) -> FlyteFile:
    # read the data from the raw csv file
    parsed_data = defaultdict(list)
    with open(data.csv_url, newline="\n") as input_file:
        reader = csv.DictReader(input_file, fieldnames=data.column_names)
        next(reader)  # Skip header
        for row in reader:
            for column in data.columns_to_normalize:
                parsed_data[column].append(float(row[column].strip()))

    # normalize the data
    normalized_data = defaultdict(list)
    for colname, values in parsed_data.items():
        mean = sum(values) / len(values)
        std = (sum([(x - mean) ** 2 for x in values]) / len(values)) ** 0.5
        normalized_data[colname] = [(x - mean) / std for x in values]

    # write to local path
    out_path = os.path.join(
        flytekit.current_context().working_directory,
        f"normalized-{os.path.basename(data.csv_url.path).rsplit('.')[0]}.csv",
    )
    with open(out_path, mode="w") as output_file:
        writer = csv.DictWriter(output_file, fieldnames=data.columns_to_normalize)
        writer.writeheader()
        for row in zip(*normalized_data.values()):
            writer.writerow({k: row[i] for i, k in enumerate(data.columns_to_normalize)})

    if data.output_location:
        return FlyteFile(path=out_path, remote_path=data.output_location)
    else:
        return FlyteFile(path=out_path)


@workflow
def normalize_csv_file(
    data: Data,
) -> FlyteFile:
    return normalize_columns(data=data)
  1. register flytefile_data_class.py
    pyflyte register -p flytesnacks --domain development --version 1 flytefile_data_class.py

  2. write a flytefile_remote_wf.py

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config

if __name__ == "__main__":
    # FlyteRemote object is the main entrypoint to API
    remote = FlyteRemote(
        config=Config.for_endpoint(endpoint="localhost:30080", insecure=True),
        default_project="flytesnacks",
        default_domain="development",
    )

    # Get Task
    flyte_workflow = remote.fetch_workflow(name="flytefile_data_class.normalize_csv_file", version="1")

    # Run Task
    execution = remote.execute(
        flyte_workflow,
        inputs={
            'data': {
                "csv_url": {"path": "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv"},
                "column_names": ["Name", "Sex", "Age", "Heights (in)", "Weight (lbs)"],
                "columns_to_normalize": ["Age"],
                "output_location": ""
            },
        }
    )

and execute the program

Screenshots

Successful screenshot

Screenshot 2023-12-08 at 6 11 42 PM
Screenshot 2023-12-08 at 6 11 51 PM


StructuredDataset

  1. write a structureddataset_data_class.py
from dataclasses_json import dataclass_json
from flytekit.types.structured import StructuredDataset

from flytekit import task, workflow, kwtypes
import pandas as pd


@dataclass_json
@dataclass
class Data:
    structured_dataset: StructuredDataset


col = kwtypes(Age=int)


@task
def get_subset_pandas_df(data: Data) -> StructuredDataset:
    df = data.structured_dataset.open(pd.DataFrame).all()
    df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
    return StructuredDataset(dataframe=df)


@task
def print_df(df: pd.DataFrame):
    print(df)


@task
def save_df(df: pd.DataFrame, path: str):
    df.to_csv(path, index=False)


@workflow
def simple_sd_wf(data: Data) -> StructuredDataset:
    # pandas_df = generate_pandas_df(a=19)
    output = get_subset_pandas_df(data=data)
    print_df(df=output.dataframe)
    save_df(df=output.dataframe, path="test3.csv")
    return output.dataframe
  1. register flytefile_data_class.py
    pyflyte register -p flytesnacks --domain development --version 1 structureddataset_data_class.py

  2. upload parquet file to minio
    Screenshot 2023-12-08 at 6 26 49 PM

  3. write a structureddataset_remote_wf.py

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


if __name__ == "__main__":
    # FlyteRemote object is the main entrypoint to API
    remote = FlyteRemote(
        config=Config.for_endpoint(endpoint="localhost:30080", insecure=True),
        default_project="flytesnacks",
        default_domain="development",
    )

    # Get Task
    flyte_workflow = remote.fetch_workflow(name="structureddataset_data_class.simple_sd_wf", version="1")

    # Run Task
    execution = remote.execute(
        flyte_workflow,
        inputs={
            'data': {
                "structured_dataset": {
                    "uri": "s3://test/part-00000-8cbb2a62-33cc-475a-8895-b06e75aa1db6-c000.snappy.parquet",
                    "file_format": "parquet",
                }
            },
        }
    )

and execute the program

Screenshots

Successful screenshot

Screenshot 2023-12-08 at 6 24 39 PM
Screenshot 2023-12-08 at 6 25 00 PM


If the user configures the settings incorrectly, a screenshot of the error should be displayed

Scenario 1: Parameter type error

Screenshot 2023-12-01 at 1 20 52 AM


Screenshot 2023-12-01 at 12 21 58 PM

error message:
TypeTransformerFailedError: Type of Val '<class 'int'>' is not an instance of 
<class 'str'>

Scenario 2: Wrong field name

Screenshot 2023-12-01 at 12 23 55 PM

error message:
TypeTransformerFailedError: The original fields are missing the following keys 
from the dataclass fields: ['b']

Screenshot 2023-12-01 at 12 25 02 PM

TypeTransformerFailedError: The original fields are missing the following keys 
from the dataclass fields: ['b']

Screenshot 2023-12-01 at 12 25 57 PM

TypeTransformerFailedError: The original fields have the following extra keys 
that are not in dataclass fields: ['d']

Note to reviewers

Related PRs

- Import the `json` module
- Add a check for dictionary types in the `DataclassTransformer` class
- Raise a `TypeTransformerFailedError` if a key in the dictionary does not match any field in the dataclass
- Convert the dictionary to a JSON string using `json.dumps()`

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
- Remove unnecessary code that checks if a key in the original dictionary matches a field in the dataclass

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
@jasonlai1218 jasonlai1218 marked this pull request as draft November 30, 2023 18:13
…e_engine.py

- Import the `re` module in `test_type_engine.py`
- Add a new test function `test_assert_dict_type` in `test_type_engine.py`
- Define a dataclass `Args` in `test_assert_dict_type` function in `test_type_engine.py`
- Test when `v` is a dictionary in `test_assert_dict_type` function in `test_type_engine.py`
- Test when `v` is a dictionary but missing keys from dataclass in `test_assert_dict_type` function in `test_type_engine.py`
- Test when `v` is a dictionary but has extra keys that are not in dataclass in `test_assert_dict_type` function in `test_type_engine.py`
- Test when the type of value in the dictionary does not match the expected type in the dataclass in `test_assert_dict_type` function in `test_type_engine.py`

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
- Add a new test `test_to_literal_dict` to `test_type_engine.py`
- Implement logic to convert a Python dictionary to a literal type
- Test the conversion when `python_val` is a dictionary
- Test the conversion when `python_val` is not a dictionary or dataclass
- Raise an exception when `python_val` is not a dictionary or dataclass with a specific error message

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
@jasonlai1218 jasonlai1218 marked this pull request as ready for review November 30, 2023 19:04
@codecov

codecov Bot commented Nov 30, 2023

Copy link
Copy Markdown

Codecov Report

Attention: 1 lines in your changes are missing coverage. Please review.

Comparison is base (ad328a7) 85.70% compared to head (e4dbac9) 84.31%.

Files Patch % Lines
flytekit/core/type_engine.py 97.29% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2013      +/-   ##
==========================================
- Coverage   85.70%   84.31%   -1.40%     
==========================================
  Files         313      313              
  Lines       23364    23389      +25     
  Branches     3489     3499      +10     
==========================================
- Hits        20025    19720     -305     
- Misses       2733     3089     +356     
+ Partials      606      580      -26     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment thread tests/flytekit/unit/core/test_type_engine.py Outdated
Comment thread tests/flytekit/unit/core/test_type_engine.py
Comment thread flytekit/core/type_engine.py Outdated
…se errors properly

- Modify the `to_literal` method in `flytekit/core/type_engine.py`
- Add a condition to handle dictionaries in the `to_literal` method
- Parse the JSON string and return a `Literal` object
- Raise an error if the input is not a dataclass

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
- Raise the amount of returned recordings from `10` to `100`
- Fix a typo in the github action name
- Move the `octokit` initialization to a separate file
- Add an OpenAI API for completions
- Lower numeric tolerance for test files
- Add 2 tests for the inclusive string split function

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
@jasonlai1218 jasonlai1218 requested a review from pingsutw December 1, 2023 05:08
@jasonlai1218 jasonlai1218 marked this pull request as draft December 6, 2023 06:03
@jasonlai1218 jasonlai1218 marked this pull request as ready for review December 8, 2023 10:41
@pingsutw pingsutw self-assigned this Dec 13, 2023
Comment thread flytekit/core/type_engine.py Outdated
Comment thread flytekit/core/type_engine.py Outdated
…would be "refactor". This is because the changes involve adding and removing imports, which do not fix a bug or add a new feature, but rather modify the code structure.: refactor import statements in core and test files

- Add `import json` to `flytekit/core/type_engine.py`
- Remove `import json` from `flytekit/core/type_engine.py`
- Add `import re` to `tests/flytekit/unit/core/test_type_engine.py`
- Remove `import re` from `tests/flytekit/unit/core/test_type_engine.py`

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
pingsutw
pingsutw previously approved these changes Dec 21, 2023

@pingsutw pingsutw left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jasonlai1218. Could you resolve the merge conflict?

Merge branch 'master' into dataclass-execution-fix

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
- Add `import pandas as pd` to the test file

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
@jasonlai1218

Copy link
Copy Markdown
Contributor Author

updated

pingsutw
pingsutw previously approved these changes Dec 22, 2023
…tements in test files

- Remove import of `pandas` from `test_type_engine.py`
- Add import of `pandas` to `test_assert_dict_type()`

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
@pingsutw pingsutw merged commit eab12e6 into flyteorg:master Dec 22, 2023
pryce-turner pushed a commit that referenced this pull request Dec 26, 2023
…hat take dataclasses (#2013)

Signed-off-by: jason.lai <jasonlai1218@gmail.com>
Signed-off-by: pryce-turner <pryce.turner@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants