Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions pydeequ/scala_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
# -*- coding: utf-8 -*-
"""A collection of utility functions and classes for manipulating with scala objects anc classes through py4j
"""
from py4j.java_gateway import JavaObject
from py4j.java_gateway import DEFAULT_PYTHON_PROXY_PORT, JavaObject


def _ensure_dynamic_callback_port(gateway):
"""Make sure the py4j callback server will NOT bind the hardcoded default
port (25334) before it is started.

Historically PyDeequ called ``gateway.start_callback_server()`` with no
arguments. With older py4j/pyspark gateways that left the callback server
on the hardcoded default port ``DEFAULT_PYTHON_PROXY_PORT`` (25334), so two
pyspark applications -- or two runs on the same host -- using a lambda-based
``Check`` collided with
``OSError: [Errno 98] Address already in use (127.0.0.1:25334)``
(issues #86, #19, #7, #72, #156, #173, #198).

The fix is simply to ask the OS for a free port (``port = 0``). We mutate
the gateway's EXISTING ``callback_server_parameters`` rather than passing a
fresh ``CallbackServerParameters`` object, so PySpark's own callback wiring
is preserved -- the JVM-side callback client stays consistent (no "Error
while obtaining a new communication channel", #19) and, crucially,
``shutdown_callback_server()`` still returns cleanly at teardown. Passing a
fresh parameters object + ``resetCallbackClient`` instead makes the accept
thread un-joinable and hangs shutdown on Linux and macOS, which would hang
the test suite's ``tearDownClass``.
"""
params = getattr(gateway, "callback_server_parameters", None)
if params is not None and getattr(params, "port", 0) == DEFAULT_PYTHON_PROXY_PORT:
Comment thread
nikolauspschuetz marked this conversation as resolved.
Comment thread
nikolauspschuetz marked this conversation as resolved.
params.port = 0


class PythonCallback:
Expand All @@ -12,16 +39,19 @@ def __init__(self, gateway):
# P4j will return false if the callback server is already started
# https://github.com/bartdag/py4j/blob/master/py4j-python/src/py4j/java_gateway.py
callback_server = self.gateway.get_callback_server()
# TODO clean
if callback_server is None:
# No callback server yet: ensure a dynamic port (avoid the 25334
# collision) and start it the stock way so PySpark's callback
# wiring -- and clean shutdown -- are preserved.
_ensure_dynamic_callback_port(self.gateway)
self.gateway.start_callback_server()
print("Python Callback server started!") # TODO Logging
elif callback_server.is_shutdown:
# The previous callback server was shut down (e.g. a prior Spark
# session was stopped). Restart it (also dynamic-port safe).
callback_server.close()
_ensure_dynamic_callback_port(self.gateway)
self.gateway.restart_callback_server()
# Have you tried turning it off and on again?
# TODO why do we need to restart this every time?
# TODO Will this break during chained function calls?
print("PythonCallback server restarted!")


Expand Down
53 changes: 53 additions & 0 deletions tests/test_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,59 @@ def test_isUnique(self):
self.assertEqual(self.isUnique("b", "All rows are unique"), [Row(constraint_status="Success")])
self.assertEqual(self.isUnique("email", "All rows are unique"), [Row(constraint_status="Success")])

def test_lambda_check_uses_dynamic_callback_port(self):
"""Regression test for the py4j callback-server default-port (25334) collision.

Closes #86, #19, #7, #72, #156, #173, #198.

With older gateways PythonCallback let py4j bind the hardcoded default
callback port 25334. Concurrent or repeated runs on the same host that
used a lambda-based Check then collided with
``OSError: [Errno 98] Address already in use (127.0.0.1:25334)``. The fix
forces a dynamic (OS-assigned) port by setting the gateway's existing
callback-server parameters to ``port = 0`` before starting the server the
stock way, so PySpark's callback wiring -- and clean shutdown -- are
preserved.

This test runs lambda-assertion Checks and asserts that the resulting
callback server is listening on a dynamic port -- never the hardcoded
default 25334. The second lambda assertion additionally exercises the #19
path: reusing PySpark's parameters keeps the JVM callback client pointed
at the bound port, otherwise the check would fail with "Error while
obtaining a new communication channel".

This test is deliberately NON-INVASIVE: it does not start, stop, restart,
or close the shared py4j callback server, and it does not bind any port
itself. Manipulating the shared callback server here was found to either
hang ``tearDownClass``'s ``shutdown_callback_server`` (a callback thread
left blocked in ``recv`` never joins) or break later lambda tests that
reuse the connection. So we only observe the port the production fix chose.
"""
gateway = self.spark.sparkContext._gateway

# A lambda assertion ensures the py4j callback server is running (started by
# PythonCallback via the fix on an OS-assigned dynamic port).
result = self.hasSize(lambda x: x == 3.0)
self.assertEqual(result, [Row(constraint_status="Success")])

callback_server = gateway.get_callback_server()
self.assertIsNotNone(callback_server, "a lambda Check should have started the callback server")
listening_port = callback_server.get_listening_port()
self.assertNotEqual(
listening_port,
25334,
"Callback server must not use the hardcoded default port 25334; "
f"got {listening_port}",
)
self.assertGreater(listening_port, 0, "callback server should be bound to a real port")

# A second lambda assertion in the same process exercises the #19 path
# (the JVM callback client must point at the dynamic port). If the client
# were not reset, this would raise "Error while obtaining a new
# communication channel" instead of returning a result.
result2 = self.hasSize(lambda x: x >= 2.0 and x < 5.0)
self.assertEqual(result2, [Row(constraint_status="Success")])

def test_fail_isUnique(self):
self.assertEqual(self.isUnique("d"), [Row(constraint_status="Failure")])
self.assertEqual(self.isUnique("f", "All rows are unique"), [Row(constraint_status="Failure")])
Expand Down
Loading