Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leak checking for vtgate tests #13835

Merged
merged 17 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ require (
github.com/spf13/afero v1.9.3
github.com/spf13/jwalterweatherman v1.1.0
github.com/xlab/treeprint v1.2.0
go.uber.org/goleak v1.1.11
go.uber.org/goleak v1.2.1
golang.org/x/sync v0.1.0
modernc.org/sqlite v1.20.3
)
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU=
go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down Expand Up @@ -692,8 +692,6 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -936,7 +934,6 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ=
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
Expand Down
3 changes: 3 additions & 0 deletions go/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Cache interface {
UsedCapacity() int64
MaxCapacity() int64
SetCapacity(int64)

// Close shuts down this cache and stops any background goroutines.
Close()
}

type cachedObject interface {
Expand Down
4 changes: 4 additions & 0 deletions go/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,7 @@ func (lru *LRUCache) checkCapacity() {
lru.evictions++
}
}

func (lru *LRUCache) Close() {
lru.Clear()
}
2 changes: 2 additions & 0 deletions go/cache/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@ func (n *nullCache) SetCapacity(_ int64) {}
func (n *nullCache) Evictions() int64 {
return 0
}

func (n *nullCache) Close() {}
20 changes: 12 additions & 8 deletions go/cache/ristretto/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Cache struct {
// stop is used to stop the processItems goroutine.
stop chan struct{}
// indicates whether cache is closed.
isClosed bool
isClosed atomic.Bool
// cost calculates cost from a value.
cost func(value any) int64
// ignoreInternalCost dictates whether to ignore the cost of internally storing
Expand Down Expand Up @@ -211,7 +211,7 @@ func NewCache(config *Config) (*Cache, error) {

// Wait blocks until all the current cache operations have been processed in the background
func (c *Cache) Wait() {
if c == nil || c.isClosed {
if c == nil || c.isClosed.Load() {
return
}
wg := &sync.WaitGroup{}
Expand All @@ -224,7 +224,7 @@ func (c *Cache) Wait() {
// value was found or not. The value can be nil and the boolean can be true at
// the same time.
func (c *Cache) Get(key string) (any, bool) {
if c == nil || c.isClosed {
if c == nil || c.isClosed.Load() {
return nil, false
}
keyHash, conflictHash := c.keyToHash(key)
Expand Down Expand Up @@ -253,7 +253,7 @@ func (c *Cache) Set(key string, value any) bool {
// cost. The built-in Cost function will not be called to evaluate the object's cost
// and instead the given value will be used.
func (c *Cache) SetWithCost(key string, value any, cost int64) bool {
if c == nil || c.isClosed {
if c == nil || c.isClosed.Load() {
return false
}

Expand Down Expand Up @@ -289,7 +289,7 @@ func (c *Cache) SetWithCost(key string, value any, cost int64) bool {

// Delete deletes the key-value item from the cache if it exists.
func (c *Cache) Delete(key string) {
if c == nil || c.isClosed {
if c == nil || c.isClosed.Load() {
return
}
keyHash, conflictHash := c.keyToHash(key)
Expand All @@ -309,7 +309,11 @@ func (c *Cache) Delete(key string) {

// Close stops all goroutines and closes all channels.
func (c *Cache) Close() {
if c == nil || c.isClosed {
if c == nil {
return
}
wasClosed := c.isClosed.Swap(true)
if wasClosed {
return
}
c.Clear()
Expand All @@ -319,14 +323,14 @@ func (c *Cache) Close() {
close(c.stop)
close(c.setBuf)
c.policy.Close()
c.isClosed = true
c.isClosed.Store(true)
}

// Clear empties the hashmap and zeroes all policy counters. Note that this is
// not an atomic operation (but that shouldn't be a problem as it's assumed that
// Set/Get calls won't be occurring until after this).
func (c *Cache) Clear() {
if c == nil || c.isClosed {
if c == nil || c.isClosed.Load() {
return
}
// Block until processItems goroutine is returned.
Expand Down
4 changes: 2 additions & 2 deletions go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func main() {
ts = topo.Open()
} else {
// Create topo server. We use a 'memorytopo' implementation.
ts = memorytopo.NewServer(tpb.Cells...)
ts = memorytopo.NewServer(context.Background(), tpb.Cells...)
}

// attempt to load any routing rules specified by tpb
Expand Down Expand Up @@ -279,7 +279,7 @@ func main() {
}

// vtgate configuration and init
resilientServer = srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
tabletTypesToWait := []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
Expand Down
3 changes: 2 additions & 1 deletion go/cmd/vtexplain/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -147,7 +148,7 @@ func parseAndRun() error {
Target: dbName,
}

vte, err := vtexplain.Init(vschema, schema, ksShardMap, opts)
vte, err := vtexplain.Init(context.Background(), vschema, schema, ksShardMap, opts)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func main() {
ts := topo.Open()
defer ts.Close()

resilientServer = srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")

tabletTypes := make([]topodatapb.TabletType, 0, 1)
if len(tabletTypesToWait) != 0 {
Expand Down
6 changes: 3 additions & 3 deletions go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func main() {
config, mycnf := initConfig(tabletAlias)

ts := topo.Open()
qsc := createTabletServer(config, ts, tabletAlias)
qsc := createTabletServer(context.Background(), config, ts, tabletAlias)

mysqld := mysqlctl.NewMysqld(config.DB)
servenv.OnClose(mysqld.Close)
Expand Down Expand Up @@ -204,15 +204,15 @@ func extractOnlineDDL() error {
return nil
}

func createTabletServer(config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias) *tabletserver.TabletServer {
func createTabletServer(ctx context.Context, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias) *tabletserver.TabletServer {
if tableACLConfig != "" {
// To override default simpleacl, other ACL plugins must set themselves to be default ACL factory
tableacl.Register("simpleacl", &simpleacl.Factory{})
} else if enforceTableACLConfig {
log.Exit("table acl config has to be specified with table-acl-config flag because enforce-tableacl-config is set.")
}
// creates and registers the query service
qsc := tabletserver.NewTabletServer("", config, ts, tabletAlias)
qsc := tabletserver.NewTabletServer(ctx, "", config, ts, tabletAlias)
servenv.OnRun(func() {
qsc.Register()
addStatusParts(qsc)
Expand Down
3 changes: 3 additions & 0 deletions go/stats/rates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestRates(t *testing.T) {
clearStats()
c := NewCountersWithSingleLabel("rcounter1", "rcounter help", "label")
r := NewRates("rates1", c, 3, -1*time.Second)
defer r.Stop()
r.snapshot()
now = now.Add(epsilon)
c.Add("tag1", 0)
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestRatesConsistency(t *testing.T) {
clearStats()
c := NewCountersWithSingleLabel("rcounter4", "rcounter4 help", "label")
r := NewRates("rates4", c, 100, -1*time.Second)
defer r.Stop()
r.snapshot()

now = now.Add(epsilon)
Expand Down Expand Up @@ -133,6 +135,7 @@ func TestRatesHook(t *testing.T) {
})

v := NewRates("rates2", c, 2, 10*time.Second)
defer v.Stop()
if gotname != "rates2" {
t.Errorf("want rates2, got %s", gotname)
}
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,12 @@ func TestHighNumberOfParams(t *testing.T) {
// connect to the vitess cluster
db, err := sql.Open("mysql", fmt.Sprintf("@tcp(%s:%v)/%s", vtParams.Host, vtParams.Port, vtParams.DbName))
require.NoError(t, err)
defer db.Close()

// run the query
r, err := db.Query(fmt.Sprintf("SELECT /*vt+ QUERY_TIMEOUT_MS=10000 */ id1 FROM t1 WHERE id1 in (%s) ORDER BY id1 ASC", strings.Join(params, ", ")), vals...)
require.NoError(t, err)
defer r.Close()

// check the results we got, we should get 5 rows with each: 0, 1, 2, 3, 4
// count is the row number we are currently visiting, also correspond to the
Expand Down
1 change: 1 addition & 0 deletions go/test/fuzzing/tablet_manager_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func FuzzTabletManagerExecuteFetchAsDba(data []byte) int {
ctx := context.Background()
cp := mysql.ConnParams{}
db := fakesqldb.New(t)
defer db.Close()
db.AddQueryPattern(".*", &sqltypes.Result{})
daemon := mysqlctl.NewFakeMysqlDaemon(db)

Expand Down
2 changes: 1 addition & 1 deletion go/test/fuzzing/vtctl_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ func Fuzz(data []byte) int {
}

func createTopo(ctx context.Context) (*topo.Server, error) {
ts := memorytopo.NewServer("zone1", "zone2", "zone3")
ts := memorytopo.NewServer(ctx, "zone1", "zone2", "zone3")
return ts, nil
}
120 changes: 120 additions & 0 deletions go/test/utils/noleak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils
dbussink marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"os"
"os/exec"
"strconv"
"strings"
"testing"
"time"

"go.uber.org/goleak"

"vitess.io/vitess/go/vt/log"
)

// LeakCheckContext returns a Context that will be automatically cancelled at the end
// of this test. If the test has finished successfully, it will be checked for goroutine
// leaks after context cancellation.
func LeakCheckContext(t testing.TB) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
EnsureNoLeaks(t)
})
return ctx
}

// LeakCheckContextTimeout behaves like LeakCheckContext but the returned Context will
// be cancelled after `timeout`, or after the test finishes, whichever happens first.
func LeakCheckContextTimeout(t testing.TB, timeout time.Duration) context.Context {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(func() {
cancel()
EnsureNoLeaks(t)
})
return ctx
}

// EnsureNoLeaks checks for goroutine and socket leaks and fails the test if any are found.
func EnsureNoLeaks(t testing.TB) {
if t.Failed() {
return
}
if err := ensureNoLeaks(); err != nil {
t.Fatal(err)
}
}

// GetLeaks checks for goroutine and socket leaks and returns an error if any are found.
// One use case is in TestMain()s to ensure that all tests are cleaned up.
func GetLeaks() error {
return ensureNoLeaks()
}

func ensureNoLeaks() error {
if err := ensureNoGoroutines(); err != nil {
return err
}
if err := ensureNoOpenSockets(); err != nil {
return err
}
return nil
}

func ensureNoGoroutines() error {
var ignored = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"),
goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("testing.tRunner.func1"),
}

var err error
for i := 0; i < 5; i++ {
err = goleak.Find(ignored...)
if err == nil {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return err
}

func ensureNoOpenSockets() error {
cmd := exec.Command("lsof", "-a", "-p", strconv.Itoa(os.Getpid()), "-i", "-P", "-V")
cmd.Stderr = nil
lsof, err := cmd.Output()
if err == nil {
log.Errorf("found open sockets:\n%s", lsof)
} else {
if strings.Contains(string(lsof), "no Internet files located") {
return nil
}
log.Errorf("failed to run `lsof`: %v (%q)", err, lsof)
}
return err
}
Loading
Loading