diff --git a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go index 1b43ecf2d90..c6f7253c791 100644 --- a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go +++ b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go @@ -18,6 +18,7 @@ package tablegc import ( "context" "flag" + "fmt" "os" "testing" "time" @@ -414,3 +415,19 @@ func TestPurgeView(t *testing.T) { validateTableExists(t, "t1") validateAnyState(t, 1024, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState) } + +func TestDropView(t *testing.T) { + viewName, err := schema.GenerateGCTableName(schema.DropTableGCState, time.Now().Add(tableTransitionExpiration)) // shortly in the future + require.NoError(t, err) + createStatement := fmt.Sprintf("create or replace view %s as select 1", viewName) + + _, err = primaryTablet.VttabletProcess.QueryTablet(createStatement, keyspaceName, true) + require.NoError(t, err) + + // view should be there, because the timestamp hint is still in the near future. + validateTableExists(t, viewName) + + time.Sleep(tableTransitionExpiration / 2) + // But by now, after the above sleep, the view's timestamp hint is in the past, and we expect TableGC to have dropped the view. + validateTableDoesNotExist(t, viewName) +} diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index d5c3ad82e74..cf3595c973c 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -72,9 +72,15 @@ var ( sqlPurgeTable = `delete from %a limit 50` sqlShowVtTables = `show full tables like '\_vt\_%'` sqlDropTable = "drop table if exists `%a`" + sqlDropView = "drop view if exists `%a`" purgeReentranceFlag int64 ) +type gcTable struct { + tableName string + isBaseTable bool +} + // transitionRequest encapsulates a request to transition a table to next state type transitionRequest struct { fromTableName string @@ -223,7 +229,7 @@ func (collector *TableGC) Close() { // operate is the main entry point for the table garbage collector operation and logic. func (collector *TableGC) operate(ctx context.Context) { - dropTablesChan := make(chan string) + dropTablesChan := make(chan *gcTable) purgeRequestsChan := make(chan bool) transitionRequestsChan := make(chan *transitionRequest) @@ -251,7 +257,11 @@ func (collector *TableGC) operate(ctx context.Context) { case <-tableCheckTicker.C: { log.Info("TableGC: tableCheckTicker") - _ = collector.checkTables(ctx, dropTablesChan, transitionRequestsChan) + if gcTables, err := collector.readTables(ctx); err != nil { + log.Errorf("TableGC: error while reading tables: %+v", err) + } else { + _ = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan) + } } case <-purgeReentranceTicker.C: { @@ -280,11 +290,11 @@ func (collector *TableGC) operate(ctx context.Context) { time.AfterFunc(time.Second, func() { purgeRequestsChan <- true }) }() } - case dropTableName := <-dropTablesChan: + case dropTable := <-dropTablesChan: { - log.Info("TableGC: dropTablesChan") - if err := collector.dropTable(ctx, dropTableName); err != nil { - log.Errorf("TableGC: error dropping table %s: %+v", dropTableName, err) + log.Infof("TableGC: found %v in dropTablesChan", dropTable.tableName) + if err := collector.dropTable(ctx, dropTable.tableName, dropTable.isBaseTable); err != nil { + log.Errorf("TableGC: error dropping table %s: %+v", dropTable.tableName, err) } } case transition := <-transitionRequestsChan: @@ -368,29 +378,39 @@ func (collector *TableGC) shouldTransitionTable(tableName string) (shouldTransit return true, state, uuid, nil } -// checkTables looks for potential GC tables in the MySQL server+schema. -// It lists _vt_% tables, then filters through those which are due-date. -// It then applies the necessary operation per table. -func (collector *TableGC) checkTables(ctx context.Context, dropTablesChan chan<- string, transitionRequestsChan chan<- *transitionRequest) error { +// readTables reads the list of _vt_% tables from the database +func (collector *TableGC) readTables(ctx context.Context) (gcTables []*gcTable, err error) { + log.Infof("TableGC: read tables") + conn, err := collector.pool.Get(ctx, nil) if err != nil { - return err + return nil, err } defer conn.Recycle() - log.Infof("TableGC: check tables") - res, err := conn.Exec(ctx, sqlShowVtTables, math.MaxInt32, true) if err != nil { - return err + return nil, err } for _, row := range res.Rows { tableName := row[0].ToString() tableType := row[1].ToString() isBaseTable := (tableType == "BASE TABLE") + gcTables = append(gcTables, &gcTable{tableName: tableName, isBaseTable: isBaseTable}) + } + return gcTables, nil +} + +// checkTables looks for potential GC tables in the MySQL server+schema. +// It lists _vt_% tables, then filters through those which are due-date. +// It then applies the necessary operation per table. +func (collector *TableGC) checkTables(ctx context.Context, gcTables []*gcTable, dropTablesChan chan<- *gcTable, transitionRequestsChan chan<- *transitionRequest) error { + log.Infof("TableGC: check tables") - shouldTransition, state, uuid, err := collector.shouldTransitionTable(tableName) + for i := range gcTables { + table := gcTables[i] // we capture as local variable as we will later use this in a goroutine + shouldTransition, state, uuid, err := collector.shouldTransitionTable(table.tableName) if err != nil { log.Errorf("TableGC: error while checking tables: %+v", err) @@ -401,30 +421,32 @@ func (collector *TableGC) checkTables(ctx context.Context, dropTablesChan chan<- continue } - log.Infof("TableGC: will operate on table %s", tableName) + log.Infof("TableGC: will operate on table %s", table.tableName) if state == schema.HoldTableGCState { // Hold period expired. Moving to next state - collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) } if state == schema.PurgeTableGCState { - if isBaseTable { + if table.isBaseTable { // This table needs to be purged. Make sure to enlist it (we may already have) - if !collector.addPurgingTable(tableName) { - collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) + if !collector.addPurgingTable(table.tableName) { + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) } } else { // This is a view. We don't need to delete rows from views. Just transition into next phase - collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) } } if state == schema.EvacTableGCState { // This table was in EVAC state for the required period. It will transition into DROP state - collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) } if state == schema.DropTableGCState { // This table needs to be dropped immediately. - go func() { dropTablesChan <- tableName }() + go func() { + dropTablesChan <- table + }() } } @@ -520,21 +542,25 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro // dropTable runs an actual DROP TABLE statement, and marks the end of the line for the // tables' GC lifecycle. -func (collector *TableGC) dropTable(ctx context.Context, tableName string) error { - conn, err := collector.pool.Get(ctx, nil) +func (collector *TableGC) dropTable(ctx context.Context, tableName string, isBaseTable bool) error { + conn, err := dbconnpool.NewDBConnection(ctx, collector.env.Config().DB.DbaWithDB()) if err != nil { return err } - defer conn.Recycle() + defer conn.Close() - parsed := sqlparser.BuildParsedQuery(sqlDropTable, tableName) + sqlDrop := sqlDropTable + if !isBaseTable { + sqlDrop = sqlDropView + } + parsed := sqlparser.BuildParsedQuery(sqlDrop, tableName) log.Infof("TableGC: dropping table: %s", tableName) - _, err = conn.Exec(ctx, parsed.Query, 1, true) + _, err = conn.ExecuteFetch(parsed.Query, 1, false) if err != nil { return err } - log.Infof("TableGC: dropped table: %s", tableName) + log.Infof("TableGC: dropped table: %s, isBaseTable: %v", tableName, isBaseTable) return nil } diff --git a/go/vt/vttablet/tabletserver/gc/tablegc_test.go b/go/vt/vttablet/tabletserver/gc/tablegc_test.go index d6b383a2ce8..446f6e6ff85 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc_test.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc_test.go @@ -17,11 +17,14 @@ limitations under the License. package gc import ( + "context" "testing" + "time" "vitess.io/vitess/go/vt/schema" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNextTableToPurge(t *testing.T) { @@ -250,3 +253,87 @@ func TestShouldTransitionTable(t *testing.T) { }) } } + +func TestCheckTables(t *testing.T) { + collector := &TableGC{ + isOpen: 0, + purgingTables: map[string]bool{}, + } + var err error + collector.lifecycleStates, err = schema.ParseGCLifecycle("hold,purge,evac,drop") + require.NoError(t, err) + + gcTables := []*gcTable{ + { + tableName: "_vt_something_that_isnt_a_gc_table", + isBaseTable: true, + }, + { + tableName: "_vt_HOLD_11111111111111111111111111111111_20990920093324", // 2099 is in the far future + isBaseTable: true, + }, + { + tableName: "_vt_HOLD_22222222222222222222222222222222_20200920093324", + isBaseTable: true, + }, + { + tableName: "_vt_DROP_33333333333333333333333333333333_20200919083451", + isBaseTable: true, + }, + { + tableName: "_vt_DROP_44444444444444444444444444444444_20200919083451", + isBaseTable: false, + }, + } + // one gcTable above is irrelevant, does not have a GC table name + // one will not transition: its date is 2099 + expectResponses := len(gcTables) - 2 + expectDropTables := []*gcTable{ + { + tableName: "_vt_DROP_33333333333333333333333333333333_20200919083451", + isBaseTable: true, + }, + { + tableName: "_vt_DROP_44444444444444444444444444444444_20200919083451", + isBaseTable: false, + }, + } + expectTransitionRequests := []*transitionRequest{ + { + fromTableName: "_vt_HOLD_22222222222222222222222222222222_20200920093324", + isBaseTable: true, + toGCState: schema.PurgeTableGCState, + uuid: "22222222222222222222222222222222", + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + dropTablesChan := make(chan *gcTable) + transitionRequestsChan := make(chan *transitionRequest) + + err = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan) + assert.NoError(t, err) + + var responses int + var foundDropTables []*gcTable + var foundTransitionRequests []*transitionRequest + for { + if responses == expectResponses { + break + } + select { + case <-ctx.Done(): + assert.FailNow(t, "timeout") + return + case gcTable := <-dropTablesChan: + responses++ + foundDropTables = append(foundDropTables, gcTable) + case request := <-transitionRequestsChan: + responses++ + foundTransitionRequests = append(foundTransitionRequests, request) + } + } + assert.ElementsMatch(t, expectDropTables, foundDropTables) + assert.ElementsMatch(t, expectTransitionRequests, foundTransitionRequests) +}