Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make view descriptors depend on table/columns by IDs #15388

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions pkg/ccl/sqlccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ func allocateTableRewrites(
// rewriteTableDescs mutates tables to match the ID and privilege specified in
// tableRewrites, as well as adjusting cross-table references to use the new
// IDs.
func rewriteTableDescs(tables []*sqlbase.TableDescriptor, tableRewrites tableRewriteMap) error {
func rewriteTableDescs(
ctx context.Context, tables []*sqlbase.TableDescriptor, tableRewrites tableRewriteMap,
) error {
for _, table := range tables {
tableRewrite, ok := tableRewrites[table.ID]
if !ok {
Expand Down Expand Up @@ -292,15 +294,34 @@ func rewriteTableDescs(tables []*sqlbase.TableDescriptor, tableRewrites tableRew
return err
}

log.VEventf(ctx, 2,
"checking dependencies of [%d] (%q): %v",
table.ID, parser.ErrString(parser.Name(table.Name)), table.DependsOn)
hasRestoredDep := false
for i, dest := range table.DependsOn {
if depRewrite, ok := tableRewrites[dest]; ok {
hasRestoredDep = true
table.DependsOn[i] = depRewrite.TableID
} else {
return errors.Errorf(
"cannot restore %q without restoring referenced table %d in same operation",
table.Name, dest)
}
}
if hasRestoredDep {
idMap := make(map[sqlbase.ID]sqlbase.ID)
for id, r := range tableRewrites {
idMap[id] = r.TableID
}
log.VEventf(ctx, 2,
"renaming table dependencies in view query for [%d] (%q): %q: %v",
table.ID, parser.ErrString(parser.Name(table.Name)), table.ViewQuery, idMap)
if err := table.RewriteViewQueryForTableSubstitution(idMap); err != nil {
return err
}
log.VEventf(ctx, 2, "rewrote view query: %q", table.ViewQuery)
}

origRefs := table.DependedOnBy
table.DependedOnBy = nil
for _, ref := range origRefs {
Expand Down Expand Up @@ -599,7 +620,7 @@ func restore(

// Assign new IDs and privileges to the tables, and update all references to
// use the new IDs.
if err := rewriteTableDescs(tables, tableRewrites); err != nil {
if err := rewriteTableDescs(restoreCtx, tables, tableRewrites); err != nil {
return failed, err
}

Expand Down
55 changes: 18 additions & 37 deletions pkg/migration/sqlmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var backwardCompatibleMigrations = []migrationDescriptor{
workFn: optInToDiagnosticsStatReporting,
},
{
name: "establish conservative dependencies for views #17280 #17269",
name: "establish conservative dependencies for views #17280 #17269 #17306",
workFn: repopulateViewDeps,
},
{
Expand Down Expand Up @@ -345,6 +345,7 @@ func eventlogUniqueIDDefault(ctx context.Context, r runner) error {
for retry := retry.Start(retry.Options{MaxRetries: 5}); retry.Next(); {
res := r.sqlExecutor.ExecuteStatements(session, alterStmt, nil)
err = checkQueryResults(res.ResultList, 1)
res.Close(ctx)
if err == nil {
break
}
Expand Down Expand Up @@ -410,47 +411,27 @@ func optInToDiagnosticsStatReporting(ctx context.Context, r runner) error {
if err == nil {
break
}
log.Warningf(ctx, "failed attempt to update setting: %s", err)
log.Warningf(ctx, "failed attempt to update setting: %v", err)
}
return err
}

// repopulateViewDeps ensures that each view V that depends on a table
// T depends on all columns in T. (#17269)
// T depends on all columns in T. (#17269 #17306)
func repopulateViewDeps(ctx context.Context, r runner) error {
descKey := sqlbase.MakeAllDescsMetadataKey()

return r.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Retrieve all the descriptors.
kvs, err := txn.Scan(ctx, descKey, descKey.PrefixEnd(), 0)
if err != nil {
return err
}

// For each descriptor, reset the depended-on list.
b := txn.NewBatch()
for _, kv := range kvs {
desc := &sqlbase.Descriptor{}
if err := kv.ValueProto(desc); err != nil {
return err
}
switch t := desc.Union.(type) {
case *sqlbase.Descriptor_Table:
tdesc := t.Table
columns := make([]sqlbase.ColumnID, len(tdesc.Columns))
for i, col := range tdesc.Columns {
columns[i] = col.ID
}
for i := range tdesc.DependedOnBy {
// Make all columns a dependency.
tdesc.DependedOnBy[i].ColumnIDs = columns
}
b.Put(sqlbase.MakeDescMetadataKey(desc.GetID()), desc)
}
}
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
// System tables can only be modified by a privileged internal user.
// Retry a limited number of times because returning an error and letting
// the node kill itself is better than holding the migration lease for an
// arbitrarily long time.
var err error
for retry := retry.Start(retry.Options{MaxRetries: 5}); retry.Next(); {
err = r.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return sql.RecomputeViewDependencies(ctx, txn, r.sqlExecutor)
})
if err == nil {
break
}
return txn.Run(ctx, b)
})
log.Warningf(ctx, "failed attempt to recompute view dependencies: %v", err)
}
return err
}
195 changes: 195 additions & 0 deletions pkg/migration/sqlmigrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"os"
"strings"
"testing"
"time"

Expand All @@ -27,6 +28,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -502,3 +505,195 @@ func TestCreateSystemTable(t *testing.T) {
t.Fatal(err)
}
}

func TestUpdateViewDependenciesMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

// remove the view update migration so we can test its effects.
newMigrations := make([]migrationDescriptor, 0, len(backwardCompatibleMigrations))
for _, m := range backwardCompatibleMigrations {
if strings.HasPrefix(m.name, "establish conservative dependencies for views") {
continue
}
newMigrations = append(newMigrations, m)
}

// We also hijack the migration process to capture the SQL memory
// metric object, needed below.
var memMetrics *sql.MemoryMetrics
backwardCompatibleMigrations = append(newMigrations,
migrationDescriptor{
name: "capture mem metrics",
workFn: func(ctx context.Context, r runner) error {
memMetrics = r.memMetrics
return nil
},
})

t.Log("starting server")

s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

createSchema := func() {
t.Log("create test tables")
const createStmts = `
CREATE DATABASE test;
CREATE DATABASE test2;
SET DATABASE=test;

CREATE TABLE t(x INT, y INT);
CREATE VIEW v1 AS SELECT x FROM t WHERE false;
CREATE VIEW v2 AS SELECT x FROM v1;

CREATE TABLE u(x INT, y INT);
CREATE VIEW v3 AS SELECT x FROM (SELECT x, y FROM u);

CREATE VIEW v4 AS SELECT id from system.descriptor;

CREATE TABLE w(x INT);
CREATE VIEW test2.v5 AS SELECT x FROM w;

CREATE TABLE x(x INT);
CREATE INDEX y ON x(x);
CREATE VIEW v6 AS SELECT x FROM x@y;
`
if _, err := sqlDB.Exec(createStmts); err != nil {
t.Fatal(err)
}
}
createSchema()

testDesc := []struct {
dbName parser.Name
tname parser.Name
desc *sqlbase.TableDescriptor
}{
{"test", "t", nil},
{"test", "v1", nil},
{"test", "u", nil},
{"test", "w", nil},
{"test", "x", nil},
{"system", "descriptor", nil},
}

e := s.Executor().(*sql.Executor)
vt := e.GetVirtualTabler()

getDescsAndBreakViews := func() {
t.Log("fetch descriptors")
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
for i := range testDesc {
desc, err := sql.MustGetTableOrViewDesc(ctx, txn, vt,
&parser.TableName{DatabaseName: testDesc[i].dbName, TableName: testDesc[i].tname}, true)
if err != nil {
return err
}
testDesc[i].desc = desc
}
return nil
}); err != nil {
t.Fatal(err)
}

// Now, corrupt the descriptors by breaking their dependency information.
t.Log("break descriptors")
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
}
for _, t := range testDesc {
t.desc.UpVersion = true
t.desc.DependedOnBy = nil
t.desc.DependedOnBy = nil

descKey := sqlbase.MakeDescMetadataKey(t.desc.GetID())
descVal := sqlbase.WrapDescriptor(t.desc)
if err := txn.Put(ctx, descKey, descVal); err != nil {
return err
}
}

return nil
}); err != nil {
t.Fatal(err)
}
}
getDescsAndBreakViews()

// Break further by deleting the referenced tables. This has become possible
// because the dependency links have been broken above.
t.Log("delete tables")

if _, err := sqlDB.Exec(`
DROP TABLE test.t;
DROP VIEW test.v1;
DROP TABLE test.u;
DROP TABLE test.w;
DROP INDEX test.x@y;
`); err != nil {
t.Fatal(err)
}

// Check the views are effectively broken.
t.Log("check views are broken")

for _, vname := range []string{"test.v2", "test.v3", "test2.v5", "test.v6"} {
_, err := sqlDB.Exec(fmt.Sprintf(`TABLE %s`, vname))
if !testutils.IsError(err,
`relation ".*" does not exist|index ".*" not found|table is being dropped`) {
t.Fatalf("%s: unexpected error: %v", vname, err)
}
}

// Restore missing dependencies for the rest of the test.
t.Log("clear the schema")
if _, err := sqlDB.Exec(`DROP DATABASE test; DROP DATABASE test2;`); err != nil {
t.Fatal(err)
}
createSchema()

// Break the descriptors again, but do not delete the tables.
getDescsAndBreakViews()

// Run the migration outside the context of a migration manager
// such that its work gets done but the key indicating it's been completed
// doesn't get written.
t.Log("fix view deps manually")
if err := repopulateViewDeps(ctx, runner{db: kvDB, sqlExecutor: e}); err != nil {
t.Fatal(err)
}

// Check that the views are fixed now.
t.Log("check views working")

// Check the views can be queried.
for _, vname := range []string{"test.v1", "test.v2", "test.v3", "test.v4", "test2.v5", "test.v6"} {
if _, err := sqlDB.Exec(fmt.Sprintf("TABLE %s", vname)); err != nil {
t.Fatal(err)
}
}

// Check that the tables cannot be dropped any more.
for _, tn := range []string{"TABLE test.t", "TABLE test.u", "TABLE test.w", "INDEX test.x@y"} {
_, err := sqlDB.Exec(fmt.Sprintf(`DROP %s`, tn))
if !testutils.IsError(err,
`cannot drop (relation|index) .* because view .* depends on it`) {
t.Fatalf("unexpected error: %v", err)
}
}

// Finally, try running the migration and make sure it still succeeds.
// This verifies the idempotency of the migration.
t.Log("run migration")

mgr := NewManager(s.Stopper(), kvDB, e, s.Clock(), memMetrics, "clientID")
backwardCompatibleMigrations = append(backwardCompatibleMigrations, migrationDescriptor{
name: "repopulate view dependencies",
workFn: repopulateViewDeps,
})
if err := mgr.EnsureMigrations(ctx); err != nil {
t.Fatal(err)
}
}
5 changes: 5 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ func (ts *TestServer) LeaseManager() interface{} {
return ts.leaseMgr
}

// Executor is part of TestServerInterface.
func (ts *TestServer) Executor() interface{} {
return ts.sqlExecutor
}

// GetNode exposes the Server's Node.
func (ts *TestServer) GetNode() *Node {
return ts.node
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ CREATE TABLE crdb_internal.create_statements (
deps := make(map[parser.TableName]struct{})
if table.IsView() {
descType = &typeView
stmt, err = p.showCreateView(ctx, parser.Name(table.Name), table)
stmt, err = p.showCreateView(ctx, parser.Name(table.Name), prefix, table)
for _, id := range table.DependsOn {
depTable, err := p.getTableDescByID(ctx, id)
if err != nil {
Expand Down
Loading