Skip to content

Commit

Permalink
Merge pull request #100697 from cucaroach/backport22.2-99876
Browse files Browse the repository at this point in the history
release-22.2: sql: deflake/unskip TenantStatementTimeoutAdmissionQueueCancelation
  • Loading branch information
cucaroach authored Apr 5, 2023
2 parents 737762a + 23ccefc commit a302371
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ go_test(
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_petermattis_goid//:goid",
"@com_github_pmezard_go_difflib//difflib",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
46 changes: 34 additions & 12 deletions pkg/sql/run_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -38,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/petermattis/goid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -753,7 +755,6 @@ func getUserConn(t *testing.T, username string, server serverutils.TestServerInt
// main statement with a timeout is blocked.
func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 78494, "flaky test")
defer log.Scope(t).Close(t)

skip.UnderStress(t, "times out under stress")
Expand All @@ -762,8 +763,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tenantID := serverutils.TestTenantID()

var hitMainQuery uint64
numBlockers := 4
var matches int64

// We can't get the tableID programmatically here, checked below with assert.
const tableID = 104
Expand All @@ -782,8 +784,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
matchBatch := func(ctx context.Context, req *roachpb.BatchRequest) bool {
tid, ok := roachpb.TenantFromContext(ctx)
if ok && tid == tenantID && len(req.Requests) > 0 {
scan, ok := req.Requests[0].GetInner().(*roachpb.ScanRequest)
scan, ok := req.Requests[0].GetInner().(*roachpb.GetRequest)
if ok && tableSpan.ContainsKey(scan.Key) {
log.Infof(ctx, "matchBatch %d", goid.Get())
return true
}
}
Expand All @@ -802,18 +805,30 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
if matchBatch(ctx, &req) {
m := atomic.AddInt64(&matches, 1)
// If any of the blockers get retried just ignore.
if m > int64(numBlockers) {
log.Infof(ctx, "ignoring extra blocker %d", goid.Get())
return nil
}
// Notify we're blocking.
log.Infof(ctx, "blocking %d", goid.Get())
unblockClientCh <- struct{}{}
<-qBlockersCh
}
return nil
},
TestingResponseErrorEvent: func(ctx context.Context, req *roachpb.BatchRequest, err error) {
if matchBatch(ctx, req) {
tid, ok := roachpb.TenantFromContext(ctx)
if ok && tid == tenantID && len(req.Requests) > 0 {
scan, ok := req.Requests[0].GetInner().(*roachpb.ScanRequest)
if ok && tableSpan.ContainsKey(scan.Key) {
cancel()
wg.Done()
log.Infof(ctx, "%s %d", scan, goid.Get())
if ok {
if tableSpan.ContainsKey(scan.Key) && atomic.CompareAndSwapUint64(&hitMainQuery, 0, 1) {
log.Infof(ctx, "got scan request error %d", goid.Get())
cancel()
wg.Done()
}
}
}
},
Expand All @@ -828,8 +843,8 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
defer db.Close()

r1 := sqlutils.MakeSQLRunner(db)
r1.Exec(t, `CREATE TABLE foo (t int)`)

r1.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false`)
r1.Exec(t, `CREATE TABLE foo (t int PRIMARY KEY)`)
row := r1.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`)
var id int64
row.Scan(&id)
Expand All @@ -848,18 +863,25 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
for _, r := range blockers {
go func(r *sqlutils.SQLRunner) {
defer wg.Done()
r.Exec(t, `SELECT * FROM foo`)
r.Exec(t, `SELECT * FROM foo WHERE t = 1234`)
}(r)
}
// Wait till all blockers are parked.
for i := 0; i < numBlockers; i++ {
<-unblockClientCh
}
client.ExpectErr(t, "timeout", `SELECT * FROM foo`)
// Unblock the blockers.
log.Infof(ctx, "blockers parked")
// Because we don't know when statement timeout will happen we have to repeat
// till we get one into the KV layer.
for atomic.LoadUint64(&hitMainQuery) == 0 {
_, err := client.DB.ExecContext(context.Background(), `SELECT * FROM foo`)
require.Error(t, err)
log.Infof(ctx, "main req finished: %v", err)
}
for i := 0; i < numBlockers; i++ {
qBlockersCh <- struct{}{}
}
log.Infof(ctx, "unblocked blockers")
wg.Wait()
require.ErrorIs(t, ctx.Err(), context.Canceled)
}

0 comments on commit a302371

Please sign in to comment.