Skip to content

Check with assertion as parameter throws "OSError: [Errno 98] Address already in use" #86

Description

@ammar-nizami

Describe the bug
When creating a check which accepts an assertion as a parameter, I get an error "OSError: [Errno 98] Address already in use"

To Reproduce

import sys
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.verification import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# data points to s3
# data = "s3://*****"

spark = SparkSession.builder.appName("pydeequ_poc").getOrCreate()
print("===== get spark context complete =====")
df = spark.read.parquet(data)
print("===== count: " + str(df.count()) + " =====")
df.printSchema()
verification_run_builder = VerificationSuite(spark).onData(df)
_check_name = "hasSize"
_assertion = "lambda x: x >= 3"
_args = [eval(_assertion)]
print("===== _args["+str(type(_args))+"]: " + str(_args) + " =====")
_check = Check(spark, CheckLevel.Warning, _check_name)
_check_func = getattr(_check, _check_name)
print("===== _check_func["+str(type(_check_func))+"]: " + str(_check_func) + " =====")
_check = _check_func(*_args)
print("===== _check["+str(type(_check))+"]: " + str(_check) + " =====")
verification_run_builder = verification_run_builder.addCheck(_check)
verification_result = verification_run_builder.run()
verification_result_df = verification_result.checkResultsAsDataFrame(spark, verification_result, pandas=False)
verification_result_df.show()

Expected behavior
Check is created, and results are generated.

Log

===== count: 2619203 =====
root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

===== _args[<class 'list'>]: [<function <lambda> at 0xffffa15a2b90>] =====
===== _check_func[<class 'method'>]: <bound method Check.hasSize of <pydeequ.checks.Check object at 0xffffa15b0210>> =====
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start
OSError: [Errno 98] Address already in use

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/ammar/pydeequ_poc_pyspark.py", line 26, in <module>
    _check = _check_func(*_args)
  File "/usr/local/lib/python3.7/site-packages/pydeequ/checks.py", line 134, in hasSize
    assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
  File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 32, in __init__
    super().__init__(gateway)
  File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 16, in __init__
    self.gateway.start_callback_server()
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1894, in start_callback_server
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2216, in start
py4j.protocol.Py4JNetworkError: An error occurred while trying to start the callback server (127.0.0.1:25334)
21/11/19 13:44:06 INFO SparkContext: Invoking stop() from shutdown hook

Additional context
I am submitting the spark job using LIVY from an airflow task. Spark has default configs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions