Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ public int port() {
*/
public void run() throws IOException {
try {
Socket clientSocket = null;
ClientConnection client = null;
while (!closed) {
LOGGER.debug("Waiting to accept new client connection.");
Socket clientSocket = serverSocket.accept();
clientSocket = serverSocket.accept();
LOGGER.debug("Accepted new client connection.");
try (ClientConnection client = new ClientConnection(clientSocket)) {
try {
client.run();
} catch (IOException e) {
LOGGER.warn("Error handling client connection.", e);
}
client = new ClientConnection(clientSocket);
try {
client.run();
} catch (IOException e) {
LOGGER.warn("Error handling client connection.", e);
}
LOGGER.debug("Closed connection with client");
}
Expand Down Expand Up @@ -112,29 +113,28 @@ public ClientConnection(Socket socket) {
*/
public void run() throws IOException {
// Read the entire input stream and write it back
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator);
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
reader.loadNextBatch();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket
.getOutputStream())) {
writer.start();
int echoed = 0;
while (true) {
int rowCount = reader.getVectorSchemaRoot().getRowCount();
if (rowCount == 0) {
break;
} else {
writer.writeBatch();
echoed += rowCount;
reader.loadNextBatch();
}
ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream());
writer.start();
int echoed = 0;
while (true) {
int rowCount = reader.getVectorSchemaRoot().getRowCount();
if (rowCount == 0) {
break;
} else {
writer.writeBatch();
echoed += rowCount;
reader.loadNextBatch();
}
writer.end();
Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
LOGGER.debug(String.format("Echoed %d records", echoed));
}
writer.end();
Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
LOGGER.debug(String.format("Echoed %d records", echoed));
reader.close(false);
}
}

Expand Down