Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session flag for stream execute grpc api #14046

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/18.0/18.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers)
- [VTOrc flag `--change-tablets-with-errant-gtid-to-drained`](#new-flag-errant-gtid-convert)
- [ERS sub flag `--wait-for-all-tablets`](#new-ers-subflag)
- [VTGate flag `--grpc-send-session-in-streaming`](#new-vtgate-streaming-sesion)
- **[VTAdmin](#vtadmin)**
- [Updated to node v18.16.0](#update-node)
- **[Deprecations and Deletions](#deprecations-and-deletions)**
Expand Down Expand Up @@ -63,6 +64,14 @@ for a response from all the tablets. Originally `EmergencyReparentShard` was mea
We have realized now that there are cases when the replication is broken but all the tablets are reachable. In these cases, it is advisable to
call `EmergencyReparentShard` with `--wait-for-all-tablets` so that it doesn't ignore one of the tablets.

#### <a id="new-vtgate-streaming-sesion"/>VTGate GRPC stream execute session flag `--grpc-send-session-in-streaming`

This flag enables transaction support on `StreamExecute` api.
One enabled, VTGate `StreamExecute` grpc api will send session as the last packet in the response.
The client should enable it only when they have made the required changes to expect such a packet.

It is disabled by default.

### <a id="vtadmin"/>VTAdmin

#### <a id="updated-node"/>vtadmin-web updated to node v18.16.0 (LTS)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Flags:
--foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow")
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
--grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/grpc_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestMain(m *testing.M) {
"--grpc_auth_static_password_file", grpcServerAuthStaticPath,
"--grpc_use_effective_callerid",
"--grpc-use-static-authentication-callerid",
"--grpc-send-session-in-streaming",
}

// Configure vttablet to use table ACL
Expand Down
22 changes: 14 additions & 8 deletions go/vt/vtgate/grpcvtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ var (
useEffective bool
useEffectiveGroups bool
useStaticAuthenticationIdentity bool

sendSessionInStreaming bool
)

func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&useEffective, "grpc_use_effective_callerid", false, "If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.")
fs.BoolVar(&useEffectiveGroups, "grpc-use-effective-groups", false, "If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.")
fs.BoolVar(&useStaticAuthenticationIdentity, "grpc-use-static-authentication-callerid", false, "If set, will set the immediate caller id to the username authenticated by the static auth plugin.")
fs.BoolVar(&sendSessionInStreaming, "grpc-send-session-in-streaming", false, "If set, will send the session as last packet in streaming api to support transactions in streaming")
}

func init() {
Expand Down Expand Up @@ -192,19 +195,22 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream
})
})

// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})

var errs []error
if vtgErr != nil {
errs = append(errs, vtgErr)
}
if lastErr != nil {
errs = append(errs, lastErr)

if sendSessionInStreaming {
// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})
if lastErr != nil {
errs = append(errs, lastErr)
}
Comment on lines +204 to +211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes were previously not behind a flag?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is already there in a released version of Vitess, then maybe we should keep the flag to default false in this release too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the breaking change was in v17 so, if you are already on v17 then the users would have already fixed it.
I do not see a point in keeping it to false in v18.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but if someone upgrades from an existing version of v17 which is already released, its going to be a breaking change for them. We do not ask users to do major upgrades only with the latest patch versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grpc users are very limited and none except one has reported it, so for others, it should be a no-op.
It covers the functionality provided in v17 only i.e. transactions in streaming for grpc api.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is still going to be a breaking change from v17.0.0 to v18.0.0. We should avoid it because it is easily avoidable by just postponing when we deprecate and delete the flag.

}

return vterrors.ToGRPC(vterrors.Aggregate(errs))
}

Expand Down
62 changes: 62 additions & 0 deletions go/vt/vttablet/tabletserver/exclude_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//go:build !race

package tabletserver

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
// truncate the error text in logs, but will not truncate the error text in the
// error value.
func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tl := newTestLogger()
defer tl.Close()
logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
db, tsv := setupTabletServerTest(t, ctx, "")
defer tsv.StopService()
defer db.Close()

longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
"bv1": sqltypes.Int64BindVariable(1111111111),
"bv2": sqltypes.Int64BindVariable(2222222222),
"bv3": sqltypes.Int64BindVariable(3333333333),
"bv4": sqltypes.Int64BindVariable(4444444444),
}
origTruncateErrLen := sqlparser.GetTruncateErrLen()
sqlparser.SetTruncateErrLen(32)
defer sqlparser.SetTruncateErrLen(origTruncateErrLen)

defer func() {
err := logStats.Error
want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
require.Error(t, err)
assert.Contains(t, err.Error(), want)
want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
gotWhatWeWant := false
for _, log := range tl.getLogs() {
if strings.HasPrefix(log, want) {
gotWhatWeWant = true
break
}
}
assert.True(t, gotWhatWeWant)
}()

defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
}
45 changes: 0 additions & 45 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,51 +1492,6 @@ func TestHandleExecUnknownError(t *testing.T) {
panic("unknown exec error")
}

// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
// truncate the error text in logs, but will not truncate the error text in the
// error value.
func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tl := newTestLogger()
defer tl.Close()
logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
db, tsv := setupTabletServerTest(t, ctx, "")
defer tsv.StopService()
defer db.Close()

longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
"bv1": sqltypes.Int64BindVariable(1111111111),
"bv2": sqltypes.Int64BindVariable(2222222222),
"bv3": sqltypes.Int64BindVariable(3333333333),
"bv4": sqltypes.Int64BindVariable(4444444444),
}
origTruncateErrLen := sqlparser.GetTruncateErrLen()
sqlparser.SetTruncateErrLen(32)
defer sqlparser.SetTruncateErrLen(origTruncateErrLen)

defer func() {
err := logStats.Error
want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
require.Error(t, err)
assert.Contains(t, err.Error(), want)
want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
gotWhatWeWant := false
for _, log := range tl.getLogs() {
if strings.HasPrefix(log, want) {
gotWhatWeWant = true
break
}
}
assert.True(t, gotWhatWeWant)
}()

defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
}

func TestQueryAsString(t *testing.T) {
longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
Expand Down
Loading