Skip to content

Add initial DataStreamDownload API implementation#185

Merged
corbin-phipps merged 12 commits into
feature/dataStreamingfrom
user/corbinphipps/add-download-api
Feb 28, 2024
Merged

Add initial DataStreamDownload API implementation#185
corbin-phipps merged 12 commits into
feature/dataStreamingfrom
user/corbinphipps/add-download-api

Conversation

@corbin-phipps
Copy link
Copy Markdown
Contributor

@corbin-phipps corbin-phipps commented Feb 28, 2024

Type

  • Bug fix
  • Feature addition
  • Feature update
  • Documentation
  • Build Infrastructure

Side Effects

  • Breaking change
  • Non-functional change

Goals

This PR adds the initial implementation of the DataStreamDownload API, including the API itself, the foundational service implementation and reactor class, a test client reactor class, and a basic unit test.

Technical Details

  • Added DataStreamDownload API to NetRemoteDataStreamingService.proto, along with applicable types into NetRemoteDataStream.proto.
  • Added DataStreamWriter implementation of the gRPC ServerWriteReactor reactor class to NetRemoteDataStreamingReactors.
  • Added test client implementation of the gRPC ClientReadReactor reactor class.
  • Added basic unit test.
  • Reverted smart pointer memory management back to new/delete.

Test Results

All tests pass.

Reviewer Focus

None.

Future Work

  • Handle DataStreamPattern options from the client's DataStreamDownloadRequest.
  • Add additional unit test for DataStreamTypeContinuous with DataStreamPatternConstant.
  • Add a better mechanism for data creation.

Checklist

  • Build target all compiles cleanly.
  • clang-format and clang-tidy deltas produced no new output.
  • Newly added functions include doxygen-style comment block.

@corbin-phipps corbin-phipps requested a review from a team as a code owner February 28, 2024 21:39
@corbin-phipps corbin-phipps merged commit c8ad32c into feature/dataStreaming Feb 28, 2024
@corbin-phipps corbin-phipps deleted the user/corbinphipps/add-download-api branch February 28, 2024 21:43
Comment on lines +66 to +70
oneof Properties
{
DataStreamFixedTypeProperties FixedTypeProperties = 3;
DataStreamContinuousTypeProperties ContinuousTypeProperties = 4;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should de-dupe Properties from the oneof field names. Also, Type probably isn't needed either:

    oneof Properties
    {
        DataStreamFixedTypeProperties Fixed = 3;
        DataStreamContinuousTypeProperties Continuous = 4;
    }

}

void
DataStreamWriter::HandleFailure(const std::string errorMessage)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is accepting a copy of the string, and then I believe another copy is made when passed to set_message. It probably makes sense to accept a const std::string& here instead, which will avoid 1 of those copies.

Also, in general, when accepting a copy/value of an argument, it's usually a good idea to accept it as non-const so the function can use it however it likes, since it owns that memory.

DataStreamReader::Await(uint32_t* numberOfDataBlocksReceived, DataStreamOperationStatus* operationStatus)
{
std::unique_lock lock(m_readStatusGate);
static constexpr auto timeoutValue = 10s;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want to eventually use these test classes outside test code (and move them to a common lib at that time).

In that case, it's likely we'll want to make this timeout value configurable. Suggest to at least add a member to DataStreamMember and read the member here instead of using the constant. Completely fine if the member is just initialized to a constant (with this value) and no way is provided for configuring it for now.

DataStreamTypeContinuous = 2;
}

enum DataStreamPattern
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this refer to the throughput pattern, for example, the timing/rate of the data flow? In that case, Constant == an uninterrupted flow of data, correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, Constant would be a steady pattern with each data message being the same number of bytes

Comment on lines +28 to +30
bytes Data = 1;
uint32 SequenceNumber = 2;
DataStreamOperationStatus Status = 3;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Data only present/valid when Status == Succeeded? If so, suggest re-ordering these such that always-present/valid fields are first, then optional/conditional ones following.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true. I can re-order the fields so that Status is first since that will always be present.

DataStreamDownloadRequest request{};
*request.mutable_properties() = std::move(properties);

auto dataStreamReader = std::make_unique<DataStreamReader>(client.get(), &request);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this can't be allocated on the stack instead? Eg.

DataStreamReader dataStreamReader{  client.get(), &request };
// ...
grpc::Status status = dataStreamReader.Await(&numberOfDataBlocksReceived, &operationStatus);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it can be stack allocated. Will update

Comment on lines +120 to +122
if (m_data.sequencenumber() != m_numberOfDataBlocksReceived) {
DataStreamOperationStatus status{};
status.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeFailed);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this now, would it be helpful to know when mismatched sequencing occurs? Might this allow us to help quantify the number of blocks dropped/lost?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. It would be easy to tell - basically move this check into OnReadDone. We'd need to decide if we want to stop reading and cancel the RPC as soon as the first sequencing mismatch occurs, or if we want to keep going and later count how many blocks were dropped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to defer this until a bit later since it's test code - tracked by #186

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants