Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ public int port() {
*/
public void run() throws IOException {
try {
Socket clientSocket = null;
ClientConnection client = null;
while (!closed) {
LOGGER.debug("Waiting to accept new client connection.");
Socket clientSocket = serverSocket.accept();
clientSocket = serverSocket.accept();
LOGGER.debug("Accepted new client connection.");
try (ClientConnection client = new ClientConnection(clientSocket)) {
try {
client.run();
} catch (IOException e) {
LOGGER.warn("Error handling client connection.", e);
}
client = new ClientConnection(clientSocket);
try {
client.run();
} catch (IOException e) {
LOGGER.warn("Error handling client connection.", e);
}
LOGGER.debug("Closed connection with client");
}
Expand Down Expand Up @@ -112,29 +113,28 @@ public ClientConnection(Socket socket) {
*/
public void run() throws IOException {
// Read the entire input stream and write it back
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator);
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
reader.loadNextBatch();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket
.getOutputStream())) {
writer.start();
int echoed = 0;
while (true) {
int rowCount = reader.getVectorSchemaRoot().getRowCount();
if (rowCount == 0) {
break;
} else {
writer.writeBatch();
echoed += rowCount;
reader.loadNextBatch();
}
ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream());
writer.start();
int echoed = 0;
while (true) {
int rowCount = reader.getVectorSchemaRoot().getRowCount();
if (rowCount == 0) {
break;
} else {
writer.writeBatch();
echoed += rowCount;
reader.loadNextBatch();
}
writer.end();
Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
LOGGER.debug(String.format("Echoed %d records", echoed));
}
writer.end();
Preconditions.checkState(reader.bytesRead() + 4 == writer.bytesWritten());
LOGGER.debug(String.format("Echoed %d records", echoed));
reader.close(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void testEchoServer(int serverPort,
}
Assert.assertFalse(reader.loadNextBatch());
assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
assertEquals(reader.bytesRead(), writer.bytesWritten());
assertEquals(reader.bytesRead() + 4, writer.bytesWritten());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowFooter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,6 +48,11 @@ public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, Writa
super(root, provider, out);
}

public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
IpcOption option) {
super(root, provider, out, option);
}

@Override
protected void startInternal(WriteChannel out) throws IOException {
ArrowMagic.writeMagic(out, true);
Expand All @@ -68,7 +74,12 @@ protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException

@Override
protected void endInternal(WriteChannel out) throws IOException {
out.writeIntLittleEndian(0);
if (option.write_legacy_ipc_format) {
out.writeIntLittleEndian(0);
} else {
out.writeLongLittleEndian(0);
}

long footerStart = out.getCurrentPosition();
out.write(new ArrowFooter(schema, dictionaryBlocks, recordBlocks), false);
int footerLength = (int) (out.getCurrentPosition() - footerStart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.IpcOption;

/**
* Writer for the Arrow stream format to send ArrowRecordBatches over a WriteChannel.
Expand All @@ -42,16 +43,25 @@ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, Out
this(root, provider, Channels.newChannel(out));
}

/**
* Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
*/
public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
this(root, provider, out, new IpcOption());
}

/**
* Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
*
* @param root Existing VectorSchemaRoot with vectors to be written.
* @param provider DictionaryProvider for any vectors that are dictionary encoded.
* (Optional, can be null)
* @param option IPC write options
* @param out WritableByteChannel for writing.
*/
public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
super(root, provider, out);
public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
IpcOption option) {
super(root, provider, out, option);
}

/**
Expand All @@ -60,8 +70,12 @@ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, Wri
* @param out Open WriteChannel with an active Arrow stream.
* @throws IOException on error
*/
public static void writeEndOfStream(WriteChannel out) throws IOException {
out.writeIntLittleEndian(0);
public void writeEndOfStream(WriteChannel out) throws IOException {
if (option.write_legacy_ipc_format) {
out.writeIntLittleEndian(0);
} else {
out.writeLongLittleEndian(0);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -59,16 +60,24 @@ public abstract class ArrowWriter implements AutoCloseable {

private boolean dictWritten = false;

protected IpcOption option;

protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
this (root, provider, out, new IpcOption());
}

/**
* Note: fields are not closed when the writer is closed.
*
* @param root the vectors to write to the output
* @param provider where to find the dictionaries
* @param out the output where to write
* @param option IPC write options
*/
protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
this.unloader = new VectorUnloader(root);
this.out = new WriteChannel(out);
this.option = option;

List<Field> fields = new ArrayList<>(root.getSchema().getFields().size());
Set<Long> dictionaryIdsUsed = new HashSet<>();
Expand Down Expand Up @@ -112,14 +121,14 @@ public void writeBatch() throws IOException {
}

protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException {
ArrowBlock block = MessageSerializer.serialize(out, batch);
ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
}

protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
ArrowBlock block = MessageSerializer.serialize(out, batch);
ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
Expand All @@ -140,7 +149,7 @@ private void ensureStarted() throws IOException {
startInternal(out);
// write the schema - for file formats this is duplicated in the footer, but matches
// the streaming format
MessageSerializer.serialize(out, schema);
MessageSerializer.serialize(out, schema, option);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public long writeIntLittleEndian(int v) throws IOException {
return write(outBuffer);
}

/**
* Writes <code>v</code> in little-endian format to the underlying channel.
*/
public long writeLongLittleEndian(long v) throws IOException {
byte[] outBuffer = new byte[8];
MessageSerializer.longToBytes(v, outBuffer);
return write(outBuffer);
}

/**
* Writes the buffer to the underlying channel.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.ipc.message;

/**
* IPC options, now only use for write.
*/
public class IpcOption {

// Write the pre-0.15.0 encapsulated IPC message format
// consisting of a 4-byte prefix instead of 8 byte
public boolean write_legacy_ipc_format = false;
}
Loading