diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java index bc15b05191c8..0ddd1e946406 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -69,16 +69,17 @@ public int port() { */ public void run() throws IOException { try { + Socket clientSocket = null; + ClientConnection client = null; while (!closed) { LOGGER.debug("Waiting to accept new client connection."); - Socket clientSocket = serverSocket.accept(); + clientSocket = serverSocket.accept(); LOGGER.debug("Accepted new client connection."); - try (ClientConnection client = new ClientConnection(clientSocket)) { - try { - client.run(); - } catch (IOException e) { - LOGGER.warn("Error handling client connection.", e); - } + client = new ClientConnection(clientSocket); + try { + client.run(); + } catch (IOException e) { + LOGGER.warn("Error handling client connection.", e); } LOGGER.debug("Closed connection with client"); } @@ -112,29 +113,28 @@ public ClientConnection(Socket socket) { */ public void run() throws IOException { // Read the entire input stream and write it back - try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator); VectorSchemaRoot root = reader.getVectorSchemaRoot(); // load the first batch before instantiating the writer so that we have any dictionaries reader.loadNextBatch(); - try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket - .getOutputStream())) { - writer.start(); - int echoed = 0; - while (true) { - int rowCount = reader.getVectorSchemaRoot().getRowCount(); - if (rowCount == 0) { - break; - } else { - writer.writeBatch(); - echoed += rowCount; - reader.loadNextBatch(); - } + ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream()); + writer.start(); + int echoed = 0; + while (true) { + int rowCount = reader.getVectorSchemaRoot().getRowCount(); + if (rowCount == 0) { + break; + } else { + writer.writeBatch(); + echoed += rowCount; + reader.loadNextBatch(); } - writer.end(); - Preconditions.checkState(reader.bytesRead() == writer.bytesWritten()); - LOGGER.debug(String.format("Echoed %d records", echoed)); } + writer.end(); + Preconditions.checkState(reader.bytesRead() == writer.bytesWritten()); + LOGGER.debug(String.format("Echoed %d records", echoed)); + reader.close(false); } }