Skip to content

Commit

Permalink
TableGC: support DROP VIEW (#14020)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Sep 20, 2023
1 parent c341b32 commit adac810
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 29 deletions.
17 changes: 17 additions & 0 deletions go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tablegc
import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -414,3 +415,19 @@ func TestPurgeView(t *testing.T) {
validateTableExists(t, "t1")
validateAnyState(t, 1024, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState)
}

func TestDropView(t *testing.T) {
viewName, err := schema.GenerateGCTableName(schema.DropTableGCState, time.Now().Add(tableTransitionExpiration)) // shortly in the future
require.NoError(t, err)
createStatement := fmt.Sprintf("create or replace view %s as select 1", viewName)

_, err = primaryTablet.VttabletProcess.QueryTablet(createStatement, keyspaceName, true)
require.NoError(t, err)

// view should be there, because the timestamp hint is still in the near future.
validateTableExists(t, viewName)

time.Sleep(tableTransitionExpiration / 2)
// But by now, after the above sleep, the view's timestamp hint is in the past, and we expect TableGC to have dropped the view.
validateTableDoesNotExist(t, viewName)
}
84 changes: 55 additions & 29 deletions go/vt/vttablet/tabletserver/gc/tablegc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ var (
sqlPurgeTable = `delete from %a limit 50`
sqlShowVtTables = `show full tables like '\_vt\_%'`
sqlDropTable = "drop table if exists `%a`"
sqlDropView = "drop view if exists `%a`"
purgeReentranceFlag int64
)

type gcTable struct {
tableName string
isBaseTable bool
}

// transitionRequest encapsulates a request to transition a table to next state
type transitionRequest struct {
fromTableName string
Expand Down Expand Up @@ -223,7 +229,7 @@ func (collector *TableGC) Close() {
// operate is the main entry point for the table garbage collector operation and logic.
func (collector *TableGC) operate(ctx context.Context) {

dropTablesChan := make(chan string)
dropTablesChan := make(chan *gcTable)
purgeRequestsChan := make(chan bool)
transitionRequestsChan := make(chan *transitionRequest)

Expand Down Expand Up @@ -251,7 +257,11 @@ func (collector *TableGC) operate(ctx context.Context) {
case <-tableCheckTicker.C:
{
log.Info("TableGC: tableCheckTicker")
_ = collector.checkTables(ctx, dropTablesChan, transitionRequestsChan)
if gcTables, err := collector.readTables(ctx); err != nil {
log.Errorf("TableGC: error while reading tables: %+v", err)
} else {
_ = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan)
}
}
case <-purgeReentranceTicker.C:
{
Expand Down Expand Up @@ -280,11 +290,11 @@ func (collector *TableGC) operate(ctx context.Context) {
time.AfterFunc(time.Second, func() { purgeRequestsChan <- true })
}()
}
case dropTableName := <-dropTablesChan:
case dropTable := <-dropTablesChan:
{
log.Info("TableGC: dropTablesChan")
if err := collector.dropTable(ctx, dropTableName); err != nil {
log.Errorf("TableGC: error dropping table %s: %+v", dropTableName, err)
log.Infof("TableGC: found %v in dropTablesChan", dropTable.tableName)
if err := collector.dropTable(ctx, dropTable.tableName, dropTable.isBaseTable); err != nil {
log.Errorf("TableGC: error dropping table %s: %+v", dropTable.tableName, err)
}
}
case transition := <-transitionRequestsChan:
Expand Down Expand Up @@ -368,29 +378,39 @@ func (collector *TableGC) shouldTransitionTable(tableName string) (shouldTransit
return true, state, uuid, nil
}

// checkTables looks for potential GC tables in the MySQL server+schema.
// It lists _vt_% tables, then filters through those which are due-date.
// It then applies the necessary operation per table.
func (collector *TableGC) checkTables(ctx context.Context, dropTablesChan chan<- string, transitionRequestsChan chan<- *transitionRequest) error {
// readTables reads the list of _vt_% tables from the database
func (collector *TableGC) readTables(ctx context.Context) (gcTables []*gcTable, err error) {
log.Infof("TableGC: read tables")

conn, err := collector.pool.Get(ctx, nil)
if err != nil {
return err
return nil, err
}
defer conn.Recycle()

log.Infof("TableGC: check tables")

res, err := conn.Exec(ctx, sqlShowVtTables, math.MaxInt32, true)
if err != nil {
return err
return nil, err
}

for _, row := range res.Rows {
tableName := row[0].ToString()
tableType := row[1].ToString()
isBaseTable := (tableType == "BASE TABLE")
gcTables = append(gcTables, &gcTable{tableName: tableName, isBaseTable: isBaseTable})
}
return gcTables, nil
}

// checkTables looks for potential GC tables in the MySQL server+schema.
// It lists _vt_% tables, then filters through those which are due-date.
// It then applies the necessary operation per table.
func (collector *TableGC) checkTables(ctx context.Context, gcTables []*gcTable, dropTablesChan chan<- *gcTable, transitionRequestsChan chan<- *transitionRequest) error {
log.Infof("TableGC: check tables")

shouldTransition, state, uuid, err := collector.shouldTransitionTable(tableName)
for i := range gcTables {
table := gcTables[i] // we capture as local variable as we will later use this in a goroutine
shouldTransition, state, uuid, err := collector.shouldTransitionTable(table.tableName)

if err != nil {
log.Errorf("TableGC: error while checking tables: %+v", err)
Expand All @@ -401,30 +421,32 @@ func (collector *TableGC) checkTables(ctx context.Context, dropTablesChan chan<-
continue
}

log.Infof("TableGC: will operate on table %s", tableName)
log.Infof("TableGC: will operate on table %s", table.tableName)

if state == schema.HoldTableGCState {
// Hold period expired. Moving to next state
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid)
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid)
}
if state == schema.PurgeTableGCState {
if isBaseTable {
if table.isBaseTable {
// This table needs to be purged. Make sure to enlist it (we may already have)
if !collector.addPurgingTable(tableName) {
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid)
if !collector.addPurgingTable(table.tableName) {
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid)
}
} else {
// This is a view. We don't need to delete rows from views. Just transition into next phase
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid)
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid)
}
}
if state == schema.EvacTableGCState {
// This table was in EVAC state for the required period. It will transition into DROP state
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid)
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid)
}
if state == schema.DropTableGCState {
// This table needs to be dropped immediately.
go func() { dropTablesChan <- tableName }()
go func() {
dropTablesChan <- table
}()
}
}

Expand Down Expand Up @@ -520,21 +542,25 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro

// dropTable runs an actual DROP TABLE statement, and marks the end of the line for the
// tables' GC lifecycle.
func (collector *TableGC) dropTable(ctx context.Context, tableName string) error {
conn, err := collector.pool.Get(ctx, nil)
func (collector *TableGC) dropTable(ctx context.Context, tableName string, isBaseTable bool) error {
conn, err := dbconnpool.NewDBConnection(ctx, collector.env.Config().DB.DbaWithDB())
if err != nil {
return err
}
defer conn.Recycle()
defer conn.Close()

parsed := sqlparser.BuildParsedQuery(sqlDropTable, tableName)
sqlDrop := sqlDropTable
if !isBaseTable {
sqlDrop = sqlDropView
}
parsed := sqlparser.BuildParsedQuery(sqlDrop, tableName)

log.Infof("TableGC: dropping table: %s", tableName)
_, err = conn.Exec(ctx, parsed.Query, 1, true)
_, err = conn.ExecuteFetch(parsed.Query, 1, false)
if err != nil {
return err
}
log.Infof("TableGC: dropped table: %s", tableName)
log.Infof("TableGC: dropped table: %s, isBaseTable: %v", tableName, isBaseTable)
return nil
}

Expand Down
87 changes: 87 additions & 0 deletions go/vt/vttablet/tabletserver/gc/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package gc

import (
"context"
"testing"
"time"

"vitess.io/vitess/go/vt/schema"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNextTableToPurge(t *testing.T) {
Expand Down Expand Up @@ -250,3 +253,87 @@ func TestShouldTransitionTable(t *testing.T) {
})
}
}

func TestCheckTables(t *testing.T) {
collector := &TableGC{
isOpen: 0,
purgingTables: map[string]bool{},
}
var err error
collector.lifecycleStates, err = schema.ParseGCLifecycle("hold,purge,evac,drop")
require.NoError(t, err)

gcTables := []*gcTable{
{
tableName: "_vt_something_that_isnt_a_gc_table",
isBaseTable: true,
},
{
tableName: "_vt_HOLD_11111111111111111111111111111111_20990920093324", // 2099 is in the far future
isBaseTable: true,
},
{
tableName: "_vt_HOLD_22222222222222222222222222222222_20200920093324",
isBaseTable: true,
},
{
tableName: "_vt_DROP_33333333333333333333333333333333_20200919083451",
isBaseTable: true,
},
{
tableName: "_vt_DROP_44444444444444444444444444444444_20200919083451",
isBaseTable: false,
},
}
// one gcTable above is irrelevant, does not have a GC table name
// one will not transition: its date is 2099
expectResponses := len(gcTables) - 2
expectDropTables := []*gcTable{
{
tableName: "_vt_DROP_33333333333333333333333333333333_20200919083451",
isBaseTable: true,
},
{
tableName: "_vt_DROP_44444444444444444444444444444444_20200919083451",
isBaseTable: false,
},
}
expectTransitionRequests := []*transitionRequest{
{
fromTableName: "_vt_HOLD_22222222222222222222222222222222_20200920093324",
isBaseTable: true,
toGCState: schema.PurgeTableGCState,
uuid: "22222222222222222222222222222222",
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
dropTablesChan := make(chan *gcTable)
transitionRequestsChan := make(chan *transitionRequest)

err = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan)
assert.NoError(t, err)

var responses int
var foundDropTables []*gcTable
var foundTransitionRequests []*transitionRequest
for {
if responses == expectResponses {
break
}
select {
case <-ctx.Done():
assert.FailNow(t, "timeout")
return
case gcTable := <-dropTablesChan:
responses++
foundDropTables = append(foundDropTables, gcTable)
case request := <-transitionRequestsChan:
responses++
foundTransitionRequests = append(foundTransitionRequests, request)
}
}
assert.ElementsMatch(t, expectDropTables, foundDropTables)
assert.ElementsMatch(t, expectTransitionRequests, foundTransitionRequests)
}

0 comments on commit adac810

Please sign in to comment.