diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index df237ab7a99a..581f5c3a1d3f 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -799,6 +799,7 @@ go_test( "@com_github_jackc_pgx_v4//:pgx", "@com_github_lib_pq//:pq", "@com_github_lib_pq//oid", + "@com_github_petermattis_goid//:goid", "@com_github_pmezard_go_difflib//difflib", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index 9162ecbaa531..71056634e29c 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -18,6 +18,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "testing" "time" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/petermattis/goid" "github.com/stretchr/testify/require" ) @@ -753,7 +755,6 @@ func getUserConn(t *testing.T, username string, server serverutils.TestServerInt // main statement with a timeout is blocked. func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 78494, "flaky test") defer log.Scope(t).Close(t) skip.UnderStress(t, "times out under stress") @@ -762,8 +763,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() tenantID := serverutils.TestTenantID() - + var hitMainQuery uint64 numBlockers := 4 + var matches int64 // We can't get the tableID programmatically here, checked below with assert. const tableID = 104 @@ -782,8 +784,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { matchBatch := func(ctx context.Context, req *roachpb.BatchRequest) bool { tid, ok := roachpb.TenantFromContext(ctx) if ok && tid == tenantID && len(req.Requests) > 0 { - scan, ok := req.Requests[0].GetInner().(*roachpb.ScanRequest) + scan, ok := req.Requests[0].GetInner().(*roachpb.GetRequest) if ok && tableSpan.ContainsKey(scan.Key) { + log.Infof(ctx, "matchBatch %d", goid.Get()) return true } } @@ -802,18 +805,30 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { if matchBatch(ctx, &req) { + m := atomic.AddInt64(&matches, 1) + // If any of the blockers get retried just ignore. + if m > int64(numBlockers) { + log.Infof(ctx, "ignoring extra blocker %d", goid.Get()) + return nil + } // Notify we're blocking. + log.Infof(ctx, "blocking %d", goid.Get()) unblockClientCh <- struct{}{} <-qBlockersCh } return nil }, TestingResponseErrorEvent: func(ctx context.Context, req *roachpb.BatchRequest, err error) { - if matchBatch(ctx, req) { + tid, ok := roachpb.TenantFromContext(ctx) + if ok && tid == tenantID && len(req.Requests) > 0 { scan, ok := req.Requests[0].GetInner().(*roachpb.ScanRequest) - if ok && tableSpan.ContainsKey(scan.Key) { - cancel() - wg.Done() + log.Infof(ctx, "%s %d", scan, goid.Get()) + if ok { + if tableSpan.ContainsKey(scan.Key) && atomic.CompareAndSwapUint64(&hitMainQuery, 0, 1) { + log.Infof(ctx, "got scan request error %d", goid.Get()) + cancel() + wg.Done() + } } } }, @@ -828,8 +843,8 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { defer db.Close() r1 := sqlutils.MakeSQLRunner(db) - r1.Exec(t, `CREATE TABLE foo (t int)`) - + r1.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false`) + r1.Exec(t, `CREATE TABLE foo (t int PRIMARY KEY)`) row := r1.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`) var id int64 row.Scan(&id) @@ -848,18 +863,25 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { for _, r := range blockers { go func(r *sqlutils.SQLRunner) { defer wg.Done() - r.Exec(t, `SELECT * FROM foo`) + r.Exec(t, `SELECT * FROM foo WHERE t = 1234`) }(r) } // Wait till all blockers are parked. for i := 0; i < numBlockers; i++ { <-unblockClientCh } - client.ExpectErr(t, "timeout", `SELECT * FROM foo`) - // Unblock the blockers. + log.Infof(ctx, "blockers parked") + // Because we don't know when statement timeout will happen we have to repeat + // till we get one into the KV layer. + for atomic.LoadUint64(&hitMainQuery) == 0 { + _, err := client.DB.ExecContext(context.Background(), `SELECT * FROM foo`) + require.Error(t, err) + log.Infof(ctx, "main req finished: %v", err) + } for i := 0; i < numBlockers; i++ { qBlockersCh <- struct{}{} } + log.Infof(ctx, "unblocked blockers") wg.Wait() require.ErrorIs(t, ctx.Err(), context.Canceled) }