From c37f9aec3590d29ddadc8dfd6fbfd3cb7c0f9365 Mon Sep 17 00:00:00 2001 From: shollyman Date: Mon, 30 Jan 2023 19:30:45 -0800 Subject: [PATCH] refactor(bigquery/storage/managedwriter): introduce send optimizers (#7323) * refactor(bigquery/storage/managedwriter): introduce send optimizers This PR introduces a new optimizer abstraction for connection objects, but doesn't wire it in. The purpose of the optimizers is to leverage awareness of previous requests to reduce transferred bytes. --- .../storage/managedwriter/send_optimizer.go | 135 +++++++++ .../managedwriter/send_optimizer_test.go | 262 ++++++++++++++++++ 2 files changed, 397 insertions(+) create mode 100644 bigquery/storage/managedwriter/send_optimizer.go create mode 100644 bigquery/storage/managedwriter/send_optimizer_test.go diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go new file mode 100644 index 000000000000..71f7ee8e4829 --- /dev/null +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -0,0 +1,135 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "google.golang.org/protobuf/proto" +) + +// optimizeAndSend handles the general task of optimizing AppendRowsRequest messages send to the backend. +// +// The basic premise is that by maintaining awareness of previous sends, individual messages can be made +// more efficient (smaller) by redacting redundant information. +type sendOptimizer interface { + // signalReset is used to signal to the optimizer that the connection is freshly (re)opened. + signalReset() + + // optimizeSend handles redactions for a given stream. + optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error +} + +// passthroughOptimizer is an optimizer that doesn't modify requests. +type passthroughOptimizer struct { +} + +func (po *passthroughOptimizer) signalReset() { + // we don't care, just here to satisfy the interface. +} + +func (po *passthroughOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error { + return arc.Send(req) +} + +// simplexOptimizer is used for connections where there's only a single stream's data being transmitted. +// +// The optimizations here are straightforward: the first request on a stream is unmodified, all +// subsequent requests can redact WriteStream, WriterSchema, and TraceID. +// +// TODO: this optimizer doesn't do schema evolution checkes, but relies on existing behavior that triggers reconnect +// on schema change. Revisit this, as it may not be necessary once b/266946486 is resolved. +type simplexOptimizer struct { + haveSent bool +} + +func (eo *simplexOptimizer) signalReset() { + eo.haveSent = false +} + +func (eo *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error { + var resp error + if eo.haveSent { + // subsequent send, clone and redact. + cp := proto.Clone(req).(*storagepb.AppendRowsRequest) + cp.WriteStream = "" + cp.GetProtoRows().WriterSchema = nil + cp.TraceId = "" + resp = arc.Send(cp) + } else { + // first request, send unmodified. + resp = arc.Send(req) + } + eo.haveSent = resp == nil + return resp +} + +// multiplexOptimizer is used for connections where requests for multiple streams are sent on a common connection. +// +// In this case, the optimizations are as follows: +// * We **must** send the WriteStream on all requests. +// * For sequential requests to the same stream, schema can be redacted after the first request. +// * Trace ID can be redacted from all requests after the first. +type multiplexOptimizer struct { + prev *storagepb.AppendRowsRequest +} + +func (mo *multiplexOptimizer) signalReset() { + mo.prev = nil +} + +func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error { + var resp error + // we'll need a copy + cp := proto.Clone(req).(*storagepb.AppendRowsRequest) + if mo.prev != nil { + var swapOnSuccess bool + // Clear trace ID. We use the _presence_ of a previous request for reasoning about TraceID, we don't compare + // it's value. + cp.TraceId = "" + // we have a previous send. + if cp.GetWriteStream() != mo.prev.GetWriteStream() { + // different stream, no further optimization. + swapOnSuccess = true + } else { + // same stream + if !proto.Equal(mo.prev.GetProtoRows().GetWriterSchema().GetProtoDescriptor(), cp.GetProtoRows().GetWriterSchema().GetProtoDescriptor()) { + swapOnSuccess = true + } else { + // the redaction case, where we won't swap. + cp.GetProtoRows().WriterSchema = nil + } + } + resp = arc.Send(cp) + if resp == nil && swapOnSuccess { + cp.GetProtoRows().Rows = nil + cp.MissingValueInterpretations = nil + mo.prev = cp + } + if resp != nil { + mo.prev = nil + } + return resp + } + + // no previous trace case. + resp = arc.Send(req) + if resp == nil { + // copy the send as the previous. + cp.GetProtoRows().Rows = nil + cp.MissingValueInterpretations = nil + mo.prev = cp + } + return resp +} diff --git a/bigquery/storage/managedwriter/send_optimizer_test.go b/bigquery/storage/managedwriter/send_optimizer_test.go new file mode 100644 index 000000000000..05737483e35c --- /dev/null +++ b/bigquery/storage/managedwriter/send_optimizer_test.go @@ -0,0 +1,262 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "io" + "testing" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "cloud.google.com/go/bigquery/storage/managedwriter/testdata" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestSendOptimizer(t *testing.T) { + + exampleReq := &storagepb.AppendRowsRequest{ + WriteStream: "foo", + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + Rows: &storagepb.ProtoRows{ + SerializedRows: [][]byte{[]byte("row_data")}, + }, + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: protodesc.ToDescriptorProto((&testdata.SimpleMessageProto2{}).ProtoReflect().Descriptor()), + }, + }, + }, + TraceId: "trace_id", + } + + var testCases = []struct { + description string + optimizer sendOptimizer + reqs []*storagepb.AppendRowsRequest + sendResults []error + wantReqs []*storagepb.AppendRowsRequest + }{ + { + description: "passthrough-optimizer", + optimizer: &passthroughOptimizer{}, + reqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + sendResults: []error{ + nil, + io.EOF, + io.EOF, + }, + wantReqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + }, + { + description: "simplex no errors", + optimizer: &simplexOptimizer{}, + reqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + sendResults: []error{ + nil, + nil, + nil, + }, + wantReqs: func() []*storagepb.AppendRowsRequest { + want := make([]*storagepb.AppendRowsRequest, 3) + // first has no redactions. + want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest) + req.GetProtoRows().WriterSchema = nil + req.TraceId = "" + req.WriteStream = "" + want[1] = req + // previous had errors, so unredacted. + want[2] = req + return want + }(), + }, + { + description: "simplex w/partial errors", + optimizer: &simplexOptimizer{}, + reqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + sendResults: []error{ + nil, + io.EOF, + nil, + }, + wantReqs: func() []*storagepb.AppendRowsRequest { + want := make([]*storagepb.AppendRowsRequest, 3) + want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest) + req.GetProtoRows().WriterSchema = nil + req.TraceId = "" + req.WriteStream = "" + want[1] = req + want[2] = want[0] + return want + }(), + }, + { + description: "multiplex single all errors", + optimizer: &multiplexOptimizer{}, + reqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + sendResults: []error{ + io.EOF, + io.EOF, + io.EOF, + }, + wantReqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + }, + { + description: "multiplex single no errors", + optimizer: &multiplexOptimizer{}, + reqs: []*storagepb.AppendRowsRequest{ + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), + }, + sendResults: []error{ + nil, + nil, + nil, + }, + wantReqs: func() []*storagepb.AppendRowsRequest { + want := make([]*storagepb.AppendRowsRequest, 3) + want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest) + req.GetProtoRows().WriterSchema = nil + req.TraceId = "" + want[1] = req + want[2] = req + return want + }(), + }, + { + description: "multiplex interleave", + optimizer: &multiplexOptimizer{}, + reqs: func() []*storagepb.AppendRowsRequest { + reqs := make([]*storagepb.AppendRowsRequest, 10) + reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + reqA.WriteStream = "alpha" + + reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + reqB.WriteStream = "beta" + reqB.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor()) + reqs[0] = reqA + reqs[1] = reqA + reqs[2] = reqB + reqs[3] = reqA + reqs[4] = reqB + reqs[5] = reqB + reqs[6] = reqB + reqs[7] = reqB + reqs[8] = reqA + reqs[9] = reqA + + return reqs + }(), + sendResults: []error{ + nil, + nil, + nil, + nil, + nil, + io.EOF, + nil, + nil, + nil, + io.EOF, + }, + wantReqs: func() []*storagepb.AppendRowsRequest { + want := make([]*storagepb.AppendRowsRequest, 10) + + wantReqAFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + wantReqAFull.WriteStream = "alpha" + + wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest) + wantReqANoTrace.TraceId = "" + + wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest) + wantReqAOpt.GetProtoRows().WriterSchema = nil + wantReqAOpt.TraceId = "" + + wantReqBFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) + wantReqBFull.WriteStream = "beta" + wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor()) + + wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest) + wantReqBNoTrace.TraceId = "" + + wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest) + wantReqBOpt.GetProtoRows().WriterSchema = nil + wantReqBOpt.TraceId = "" + + want[0] = wantReqAFull + want[1] = wantReqAOpt + want[2] = wantReqBNoTrace + want[3] = wantReqANoTrace + want[4] = wantReqBNoTrace + want[5] = wantReqBOpt + want[6] = wantReqBFull + want[7] = wantReqBOpt + want[8] = wantReqANoTrace + want[9] = wantReqAOpt + + return want + }(), + }, + } + + for _, tc := range testCases { + testARC := &testAppendRowsClient{} + testARC.sendF = func(req *storagepb.AppendRowsRequest) error { + testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest)) + respErr := tc.sendResults[0] + tc.sendResults = tc.sendResults[1:] + return respErr + } + + for _, req := range tc.reqs { + tc.optimizer.optimizeSend(testARC, req) + } + // now, compare. + for k, wr := range tc.wantReqs { + if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" { + t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff) + } + } + } +}