diff --git a/pydeequ/scala_utils.py b/pydeequ/scala_utils.py index b6d3e83..29c22c3 100644 --- a/pydeequ/scala_utils.py +++ b/pydeequ/scala_utils.py @@ -1,7 +1,34 @@ # -*- coding: utf-8 -*- """A collection of utility functions and classes for manipulating with scala objects anc classes through py4j """ -from py4j.java_gateway import JavaObject +from py4j.java_gateway import DEFAULT_PYTHON_PROXY_PORT, JavaObject + + +def _ensure_dynamic_callback_port(gateway): + """Make sure the py4j callback server will NOT bind the hardcoded default + port (25334) before it is started. + + Historically PyDeequ called ``gateway.start_callback_server()`` with no + arguments. With older py4j/pyspark gateways that left the callback server + on the hardcoded default port ``DEFAULT_PYTHON_PROXY_PORT`` (25334), so two + pyspark applications -- or two runs on the same host -- using a lambda-based + ``Check`` collided with + ``OSError: [Errno 98] Address already in use (127.0.0.1:25334)`` + (issues #86, #19, #7, #72, #156, #173, #198). + + The fix is simply to ask the OS for a free port (``port = 0``). We mutate + the gateway's EXISTING ``callback_server_parameters`` rather than passing a + fresh ``CallbackServerParameters`` object, so PySpark's own callback wiring + is preserved -- the JVM-side callback client stays consistent (no "Error + while obtaining a new communication channel", #19) and, crucially, + ``shutdown_callback_server()`` still returns cleanly at teardown. Passing a + fresh parameters object + ``resetCallbackClient`` instead makes the accept + thread un-joinable and hangs shutdown on Linux and macOS, which would hang + the test suite's ``tearDownClass``. + """ + params = getattr(gateway, "callback_server_parameters", None) + if params is not None and getattr(params, "port", 0) == DEFAULT_PYTHON_PROXY_PORT: + params.port = 0 class PythonCallback: @@ -12,16 +39,19 @@ def __init__(self, gateway): # P4j will return false if the callback server is already started # https://github.com/bartdag/py4j/blob/master/py4j-python/src/py4j/java_gateway.py callback_server = self.gateway.get_callback_server() - # TODO clean if callback_server is None: + # No callback server yet: ensure a dynamic port (avoid the 25334 + # collision) and start it the stock way so PySpark's callback + # wiring -- and clean shutdown -- are preserved. + _ensure_dynamic_callback_port(self.gateway) self.gateway.start_callback_server() print("Python Callback server started!") # TODO Logging elif callback_server.is_shutdown: + # The previous callback server was shut down (e.g. a prior Spark + # session was stopped). Restart it (also dynamic-port safe). callback_server.close() + _ensure_dynamic_callback_port(self.gateway) self.gateway.restart_callback_server() - # Have you tried turning it off and on again? - # TODO why do we need to restart this every time? - # TODO Will this break during chained function calls? print("PythonCallback server restarted!") diff --git a/tests/test_checks.py b/tests/test_checks.py index 878257b..60257e0 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -452,6 +452,59 @@ def test_isUnique(self): self.assertEqual(self.isUnique("b", "All rows are unique"), [Row(constraint_status="Success")]) self.assertEqual(self.isUnique("email", "All rows are unique"), [Row(constraint_status="Success")]) + def test_lambda_check_uses_dynamic_callback_port(self): + """Regression test for the py4j callback-server default-port (25334) collision. + + Closes #86, #19, #7, #72, #156, #173, #198. + + With older gateways PythonCallback let py4j bind the hardcoded default + callback port 25334. Concurrent or repeated runs on the same host that + used a lambda-based Check then collided with + ``OSError: [Errno 98] Address already in use (127.0.0.1:25334)``. The fix + forces a dynamic (OS-assigned) port by setting the gateway's existing + callback-server parameters to ``port = 0`` before starting the server the + stock way, so PySpark's callback wiring -- and clean shutdown -- are + preserved. + + This test runs lambda-assertion Checks and asserts that the resulting + callback server is listening on a dynamic port -- never the hardcoded + default 25334. The second lambda assertion additionally exercises the #19 + path: reusing PySpark's parameters keeps the JVM callback client pointed + at the bound port, otherwise the check would fail with "Error while + obtaining a new communication channel". + + This test is deliberately NON-INVASIVE: it does not start, stop, restart, + or close the shared py4j callback server, and it does not bind any port + itself. Manipulating the shared callback server here was found to either + hang ``tearDownClass``'s ``shutdown_callback_server`` (a callback thread + left blocked in ``recv`` never joins) or break later lambda tests that + reuse the connection. So we only observe the port the production fix chose. + """ + gateway = self.spark.sparkContext._gateway + + # A lambda assertion ensures the py4j callback server is running (started by + # PythonCallback via the fix on an OS-assigned dynamic port). + result = self.hasSize(lambda x: x == 3.0) + self.assertEqual(result, [Row(constraint_status="Success")]) + + callback_server = gateway.get_callback_server() + self.assertIsNotNone(callback_server, "a lambda Check should have started the callback server") + listening_port = callback_server.get_listening_port() + self.assertNotEqual( + listening_port, + 25334, + "Callback server must not use the hardcoded default port 25334; " + f"got {listening_port}", + ) + self.assertGreater(listening_port, 0, "callback server should be bound to a real port") + + # A second lambda assertion in the same process exercises the #19 path + # (the JVM callback client must point at the dynamic port). If the client + # were not reset, this would raise "Error while obtaining a new + # communication channel" instead of returning a result. + result2 = self.hasSize(lambda x: x >= 2.0 and x < 5.0) + self.assertEqual(result2, [Row(constraint_status="Success")]) + def test_fail_isUnique(self): self.assertEqual(self.isUnique("d"), [Row(constraint_status="Failure")]) self.assertEqual(self.isUnique("f", "All rows are unique"), [Row(constraint_status="Failure")])