Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming in tsgrpc kvstore driver. #152

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

zlobober
Copy link

@zlobober zlobober commented Apr 8, 2024

This commit changes tsgrpc protocol in backward-incompatible manner, addressing issue #137.
After this change, it becomes possible to read and write values that exceed the theoretic maximum
of 2 GiB per proto message.

This commit changes tsgrpc protocol in backward-incompatible manner, addressing issue google#137.
After this change, it becomes possible to read and write values that exceed the theoretic maximum
of 2 GiB per proto message.
@zlobober zlobober marked this pull request as ready for review April 8, 2024 23:46
@zlobober
Copy link
Author

@laramiel, would you mind having a look?

@laramiel
Copy link
Collaborator

Looking; generally looks reasonable, will try to give it more of a review tomorrow, or early next week.

@@ -137,7 +138,11 @@ struct TsGrpcKeyValueStoreSpecData {
jb::Member(
DataCopyConcurrencyResource::id,
jb::Projection<
&TsGrpcKeyValueStoreSpecData::data_copy_concurrency>()) /**/
&TsGrpcKeyValueStoreSpecData::data_copy_concurrency>()),
jb::Member("max_sent_part_bytes",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we just use something like this:

constexpr size_t kMaxPayloadSize = 2 * 1024 * 1024; // 2mb, gRPC allows 4mb.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... which means that there should be a separate test which uses a 4mb buffer, or so.

internal::IntrusivePtr<TsGrpcKeyValueStore> driver;
grpc::ClientContext context;
WriteRequest request;
WriteResponse response;

absl::Cord value;
size_t value_offset = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#include <stddef.h>

void SetNextPart() {
auto next_part = value.Subcord(value_offset, driver->GetMaxSentPartBytes());
request.set_value_part(std::move(next_part));
value_offset = std::min(value.size(), value_offset + next_part.size());
Copy link
Collaborator

@laramiel laramiel Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#include <algorithm>

@@ -91,6 +94,20 @@ TEST_F(KvStoreTest, Basic) {
tensorstore::internal::TestKeyValueReadWriteOps(store);
}

TEST_F(KvStoreTest, Multipart) {
auto context = tensorstore::Context::Default();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use something like this:

  // uses 2mb chunks; this will force at least two writes.
  tensorstore::internal::FlatCordBuilder cord_builder(16 + 2 * 1024 * 1024);
  memset(cord_builder.data(), 0x37, cord_builder.size());
  absl::Cord data = std::move(cord_builder).Build();
  data.Append("abcd");  // second chunk.

With these includes:

#include <cstring>
#include <utility>
#include "third_party/tensorstore/internal/flat_cord_builder.h"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise in tsgrpc_test.cc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea behind this change? Would you like to (implicitly) fix the fact that default value of max_sent_part_bytes (or max_message_bytes as you proposed to rename) equals to 2 mib? If yes, I can alternatively introduce a separate test that ensures that default part size is 2 mib without changing much the existing tests; to me it looks like they do their job and cover all "chunking" logic.

Also, IMHO using long strings in tsgrpc_test.cc would be less elegant, as the rest of tests constructs proto messages using ParseTextProtoOrDie, which makes understanding the communication quite easy.

But if you insist, I'll rewrite all unittests using 2 mib chunks.

Also, why do you suggest 16 + 2 * 1024 * 1024? The first chunk would in fact be the first 2 mib, and the second would consist of 16 times (char)0x37 and abcd in your example, and not of "abcd" solely as your comment suggests.

})
.value();
}

MockKvStoreService& mock() { return *mock_service_.service(); }

void ExpectWriteRequestsAre(grpc::ServerReader<WriteRequest>* req,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using EXPECT in nested functions; instead consider adding a matcher.

https://google.github.io/googletest/reference/matchers.html#defining-matchers

if (!promise.result_needed()) return;
// Actual value set to the promise does not matter, as the promise is only used for tracking completion
// or cancellation.
promise.SetResult(absl::OkStatus());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should be more like:

promise.SetResult(self->HandleResult(write_result.result()));

Or at least, the HandleResult() needs the status response handled correctly.

@@ -440,32 +507,37 @@ TENSORSTORE_DEFINE_JSON_DEFAULT_BINDER(
}),
jb::Member("bind_addresses",
jb::Projection<&KvStoreServer::Spec::bind_addresses>(
jb::DefaultInitializedValue()))));
jb::DefaultInitializedValue())),
jb::Member("max_sent_part_bytes",
Copy link
Collaborator

@laramiel laramiel Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, if we want to support a parameter like this, then it needs to be validated to be within the range [1, 4mb - ~16), otherwise it will cause gRPC issues.

Also, I don't really like the name; "max_message_bytes" may be better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually considered such naming but decided that it would be sort of misleading, as this parameter limits the size of the "value_part" field of the protobuf and not the total size of the message (which is typically slightly larger due to the rest of the fields).

Anyway, this should not be a big deal, so let me change the naming as you propose.

@laramiel
Copy link
Collaborator

laramiel commented May 9, 2024

If you sync this, then I can look at applying it.

@zlobober
Copy link
Author

zlobober commented May 9, 2024

Was on conference/sick leave, sorry for long time without response.
Actually I am working on fixing everything right now, will bring an update later today/tomorrow.
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants