Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
final ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
final ClientStreamListener writer = new PutObserver(
descriptor, observer, stream.completed::isDone,
descriptor, observer, stream.cancelled::isDone,
() -> {
try {
stream.completed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public CallStatus status() {
@Override
public String toString() {
String s = getClass().getName();
return String.format("%s: %s", s, status);
return String.format("%s: %s: %s", s, status.code(), status.description());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorUnloader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

Expand Down Expand Up @@ -189,24 +189,22 @@ public StreamObserver<ArrowMessage> doPutCustom(final StreamObserver<Flight.PutR
responseObserver.disableAutoInboundFlowControl();
responseObserver.request(1);

final FlightStream fs = new FlightStream(allocator, PENDING_REQUESTS, (String message, Throwable cause) -> {
responseObserver.onError(Status.CANCELLED.withCause(cause).withDescription(message).asException());
}, responseObserver::request);
final StreamPipe<PutResult, Flight.PutResult> ackStream = StreamPipe
.wrap(responseObserver, PutResult::toProtocol, this::handleExceptionWithMiddleware);
final FlightStream fs = new FlightStream(
allocator,
PENDING_REQUESTS,
/* server-upload streams are not cancellable */null,
responseObserver::request);
// When the ackStream is completed, the FlightStream will be closed with it
ackStream.setAutoCloseable(fs);
final StreamObserver<ArrowMessage> observer = fs.asObserver();
executors.submit(() -> {
final StreamPipe<PutResult, Flight.PutResult> ackStream = StreamPipe
.wrap(responseObserver, PutResult::toProtocol, this::handleExceptionWithMiddleware);
try {
producer.acceptPut(makeContext(responseObserver), fs, ackStream).run();
} catch (Exception ex) {
ackStream.onError(ex);
} finally {
// Close this stream before telling gRPC that the call is complete. That way we don't race with server shutdown.
try {
fs.close();
} catch (Exception e) {
handleExceptionWithMiddleware(e);
}
// ARROW-6136: Close the stream if and only if acceptPut hasn't closed it itself
// We don't do this for other streams since the implementation may be asynchronous
ackStream.ensureCompleted();
Expand Down Expand Up @@ -236,7 +234,7 @@ public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight
*/
private void handleExceptionWithMiddleware(Throwable t) {
final Map<Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
if (middleware == null) {
if (middleware == null || middleware.isEmpty()) {
logger.error("Uncaught exception in Flight method body", t);
return;
}
Expand All @@ -258,14 +256,14 @@ public void getSchema(Flight.FlightDescriptor request, StreamObserver<Flight.Sch

/** Ensures that other resources are cleaned up when the service finishes its call. */
private static class ExchangeListener extends GetListener {
private final AutoCloseable resource;

private AutoCloseable resource;
private boolean closed = false;
private Runnable onCancelHandler = null;

public ExchangeListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler,
AutoCloseable resource) {
public ExchangeListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler) {
super(responseObserver, errorHandler);
this.resource = resource;
this.resource = null;
super.setOnCancelHandler(() -> {
try {
if (onCancelHandler != null) {
Expand All @@ -285,7 +283,7 @@ private void cleanup() {
}
closed = true;
try {
this.resource.close();
AutoCloseables.close(resource);
} catch (Exception e) {
throw CallStatus.INTERNAL
.withCause(e)
Expand Down Expand Up @@ -321,19 +319,16 @@ public void setOnCancelHandler(Runnable handler) {
public StreamObserver<ArrowMessage> doExchangeCustom(StreamObserver<ArrowMessage> responseObserverSimple) {
final ServerCallStreamObserver<ArrowMessage> responseObserver =
(ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
final FlightStream fs = new FlightStream(allocator, PENDING_REQUESTS, (String message, Throwable cause) -> {
responseObserver.onError(Status.CANCELLED.withCause(cause).withDescription(message).asException());
}, responseObserver::request);
// When service completes the call, this cleans up the FlightStream
final ExchangeListener listener = new ExchangeListener(
responseObserver,
this::handleExceptionWithMiddleware,
() -> {
// Force the stream to "complete" so it will close without incident. At this point, we don't care since
// we are about to end the call. (Normally it will raise an error.)
fs.completed.complete(null);
fs.close();
});
this::handleExceptionWithMiddleware);
final FlightStream fs = new FlightStream(
allocator,
PENDING_REQUESTS,
/* server-upload streams are not cancellable */null,
responseObserver::request);
// When service completes the call, this cleans up the FlightStream
listener.resource = fs;
responseObserver.disableAutoInboundFlowControl();
responseObserver.request(1);
final StreamObserver<ArrowMessage> observer = fs.asObserver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -54,7 +55,6 @@
* An adaptor between protobuf streams and flight data streams.
*/
public class FlightStream implements AutoCloseable {

// Use AutoCloseable sentinel objects to simplify logic in #close
private final AutoCloseable DONE = () -> {
};
Expand All @@ -68,7 +68,13 @@ public class FlightStream implements AutoCloseable {
private final SettableFuture<FlightDescriptor> descriptor = SettableFuture.create();
private final int pendingTarget;
private final Requestor requestor;
// The completion flags.
// This flag is only updated as the user iterates through the data, i.e. it tracks whether the user has read all the
// data and closed the stream
final CompletableFuture<Void> completed;
// This flag is immediately updated when gRPC signals that the server has ended the call. This is used to make sure
// we don't block forever trying to write to a server that has rejected a call.
final CompletableFuture<Void> cancelled;

private volatile int pending = 1;
private volatile VectorSchemaRoot fulfilledRoot;
Expand All @@ -84,16 +90,19 @@ public class FlightStream implements AutoCloseable {
*
* @param allocator The allocator to use for creating/reallocating buffers for Vectors.
* @param pendingTarget Target number of messages to receive.
* @param cancellable Only provided for streams from server to client, used to cancel mid-stream requests.
* @param cancellable Used to cancel mid-stream requests.
* @param requestor A callback to determine how many pending items there are.
*/
public FlightStream(BufferAllocator allocator, int pendingTarget, Cancellable cancellable, Requestor requestor) {
Objects.requireNonNull(allocator);
Objects.requireNonNull(requestor);
this.allocator = allocator;
this.pendingTarget = pendingTarget;
this.cancellable = cancellable;
this.requestor = requestor;
this.dictionaries = new DictionaryProvider.MapDictionaryProvider();
this.completed = new CompletableFuture<>();
this.cancelled = new CompletableFuture<>();
}

/**
Expand Down Expand Up @@ -158,29 +167,52 @@ public FlightDescriptor getDescriptor() {
/**
* Closes the stream (freeing any existing resources).
*
* <p>If the stream isn't complete and is cancellable, this method will cancel the stream first.</p>
* <p>If the stream isn't complete and is cancellable, this method will cancel and drain the stream first.
*/
public void close() throws Exception {
final List<AutoCloseable> closeables = new ArrayList<>();
// cancellation can throw, but we still want to clean up resources, so make it an AutoCloseable too
closeables.add(() -> {
if (!completed.isDone() && cancellable != null) {
cancel("Stream closed before end.", /* no exception to report */ null);
Throwable suppressor = null;
if (cancellable != null) {
// Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends.
// On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback
// may never run https://github.com/grpc/grpc-java/issues/5882
try {
synchronized (cancellable) {
if (!cancelled.isDone()) {
// Only cancel if the call is not done on the gRPC side
cancellable.cancel("Stream closed before end", /* no exception to report */null);
}
}
// Drain the stream without the lock (as next() implicitly needs the lock)
while (next()) { }
} catch (FlightRuntimeException e) {
suppressor = e;
}
});
if (fulfilledRoot != null) {
closeables.add(fulfilledRoot);
}
closeables.add(applicationMetadata);
closeables.addAll(queue);
if (dictionaries != null) {
dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector()));
// Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its just the wording of the comments but this seems like its theoretically possible for an observer to put a message between 187 and the lock gets aquired in 195. Is that true? The chance is prob pretty small and not easy to code for. Just wanted a bit of clarification

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - it's possible because the observer is run in a separate thread (by gRPC) than the application, so the observer can trigger when the application is in the middle of close(). On the client side, draining the stream (as done here) prevents this case, but on the server side, we can't rely on this, unfortunately, hence the lock to protect things.

// middle of cleanup. This should only be a concern for server-side streams since client-side streams are drained
// by the lambda above.
synchronized (completed) {
try {
if (fulfilledRoot != null) {
closeables.add(fulfilledRoot);
}
closeables.add(applicationMetadata);
closeables.addAll(queue);
if (dictionaries != null) {
dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector()));
}
if (suppressor != null) {
AutoCloseables.close(suppressor, closeables);
} else {
AutoCloseables.close(closeables);
}
} finally {
// The value of this CompletableFuture is meaningless, only whether it's completed (or has an exception)
// No-op if already complete
completed.complete(null);
}
}

AutoCloseables.close(closeables);
// Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception)
// No-op if already complete; do this after the check in the AutoCloseable lambda above
completed.complete(null);
}

/**
Expand Down Expand Up @@ -337,8 +369,22 @@ private class Observer implements StreamObserver<ArrowMessage> {
super();
}

/** Helper to add an item to the queue under the appropriate lock. */
private void enqueue(AutoCloseable message) {
synchronized (completed) {
if (completed.isDone()) {
// The stream is already closed (RPC ended), discard the message
AutoCloseables.closeNoChecked(message);
} else {
queue.add(message);
}
}
}

@Override
public void onNext(ArrowMessage msg) {
// Operations here have to be under a lock so that we don't add a message to the queue while in the middle of
// close().
requestOutstanding();
switch (msg.getMessageType()) {
case NONE: {
Expand All @@ -347,7 +393,7 @@ public void onNext(ArrowMessage msg) {
descriptor.set(new FlightDescriptor(msg.getDescriptor()));
}
if (msg.getApplicationMetadata() != null) {
queue.add(msg);
enqueue(msg);
}
break;
}
Expand All @@ -367,60 +413,63 @@ public void onNext(ArrowMessage msg) {
try {
MetadataV4UnionChecker.checkRead(schema, metadataVersion);
} catch (IOException e) {
queue.add(DONE_EX);
ex = e;
enqueue(DONE_EX);
break;
}

fulfilledRoot = VectorSchemaRoot.create(schema, allocator);
loader = new VectorLoader(fulfilledRoot);
if (msg.getDescriptor() != null) {
descriptor.set(new FlightDescriptor(msg.getDescriptor()));
synchronized (completed) {
if (!completed.isDone()) {
fulfilledRoot = VectorSchemaRoot.create(schema, allocator);
loader = new VectorLoader(fulfilledRoot);
if (msg.getDescriptor() != null) {
descriptor.set(new FlightDescriptor(msg.getDescriptor()));
}
root.set(fulfilledRoot);
}
}
root.set(fulfilledRoot);
break;
}
case RECORD_BATCH:
queue.add(msg);
break;
case DICTIONARY_BATCH:
queue.add(msg);
enqueue(msg);
break;
case TENSOR:
default:
queue.add(DONE_EX);
ex = new UnsupportedOperationException("Unable to handle message of type: " + msg.getMessageType());
enqueue(DONE_EX);
}
}

@Override
public void onError(Throwable t) {
ex = StatusUtils.fromThrowable(t);
queue.add(DONE_EX);
cancelled.complete(null);
root.setException(ex);
}

@Override
public void onCompleted() {
// Depends on gRPC calling onNext and onCompleted non-concurrently
cancelled.complete(null);
queue.add(DONE);
}
}

/**
* Cancels sending the stream to a client.
*
* @throws UnsupportedOperationException on a stream being uploaded from the client.
* <p>Callers should drain the stream (with {@link #next()}) to ensure all messages sent before cancellation are
* received and to wait for the underlying transport to acknowledge cancellation.
*/
public void cancel(String message, Throwable exception) {
completed.completeExceptionally(
CallStatus.CANCELLED.withDescription(message).withCause(exception).toRuntimeException());
if (cancellable != null) {
cancellable.cancel(message, exception);
} else {
if (cancellable == null) {
throw new UnsupportedOperationException("Streams cannot be cancelled that are produced by client. " +
"Instead, server should reject incoming messages.");
}
cancellable.cancel(message, exception);
// Do not mark the stream as completed, as gRPC may still be delivering messages.
}

StreamObserver<ArrowMessage> asObserver() {
Expand Down
Loading