Skip to content

Issue with application exit while using pydeequ #198

Description

@Shubham11Gupta

I am using the given code as a util file which is being use to run a config driven job,
On the failure of the check job, the task is supposed to finish and give a notification but the task is not finishing even after going in the exception block.

from pydeequ.checks import Check,CheckLevel
from pydeequ.verification import VerificationSuite,VerificationResult

class ValidatorObject:

def checkIsNonNegative(self, spark, df, column):
    """Method to validate if a column has non negative values
    :param spark: spark session, df: Dataframe, column: One column
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"

    check = Check(spark, CheckLevel.Warning, "check isNonNegative")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isNonNegative(column)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json
    #return checkResult_df.select("constraint_status").collect()

def checkIsContainedIn(self, spark, df, column, allowed_values):
    """Method to validate if a value is exists in a column
    :param spark: spark session, df: Dataframe, allowed_values: list of possible values
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"
    assert allowed_values, "Error while passing allowed_values"

    check = Check(spark, CheckLevel.Warning, "check isContainedIn")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isContainedIn(column, allowed_values)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json
    #return checkResult_df.select("constraint_status").collect()

def checkHasSize(self, spark, df, assertion, hint=None):
    """Method to validate if a value exists in a column
    :param spark: spark session, df: Dataframe, assertion: condition to be passed
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert assertion, "Error while passing assertion"

    check = Check(spark, CheckLevel.Warning, "test hasSize")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.hasSize(assertion, hint)).run()
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    #return checkResult_df.select("constraint_status").collect()
    return checkResult_json

def checkIsComplete(self, spark, df, column):
    """Method to validate if a the entire column is complete
    :param spark: spark session, df: Dataframe, column: One column
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"

    check = Check(spark, CheckLevel.Warning, "test isComplete")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isComplete(column)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json

def checkIsUnique(self, spark, df, column):
    """Method to validate if a column has unique values
    :param spark: spark session, df: Dataframe, column: One column
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"

    check = Check(spark, CheckLevel.Warning, "test isUnique")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isUnique(column)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json

this is happening only with the cases where i am using
from pydeequ.checks import Check,CheckLevel from pydeequ.verification import VerificationSuite,VerificationResult

in one other case where i am using
from pydeequ.analyzers import (AnalysisRunner, AnalyzerContext, Completeness, Uniqueness, UniqueValueRatio, Size, Mean, Correlation)
On the failure of the check the task is finishing so i deduced the issue is only with either of the pydeequ.checks or pydeequ.verification.
please help me with this

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions