Skip to content

Commit

Permalink
Merge #65352 #65746
Browse files Browse the repository at this point in the history
65352: sql/catalog/typedesc: validate OID range before converting it to ID r=sajjadrizvi a=sajjadrizvi

Previously the code to convert an OID to a `descpb.ID` assumed that the OID is
larger than a predefined constant. There were no checks to validate the given
OID during conversion. As a result, an out-of-range OID could be
converted to an invalid descriptor ID without an error.

The changes in this PR validate the range of given user-defined OID before
conversion, which encourages the user to check the conversion for
potential problems.

Release note: None

Fixes #58414

65746: changefeedccl: Use span frontier when emitting resolved spans.  r=stevendanna a=miretskiy

Use span frontier to emit change aggregator resolved spans instead
of storing additional list of spans to flush.

Using span frontier reduces the amount of memory we use (removes
duplicate span storage since frontier keeps track of spans anyway).
Furthermore, because span frontier merges adjacent spans,
we reduce the number of resolved span messages sent to change frontier
aggregator, thus making job checkpointing more efficient.

Release Notes: None

Co-authored-by: Sajjad Rizvi <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed May 27, 2021
3 parents 942e8f8 + fff814f + 9870bc9 commit 9cf809c
Show file tree
Hide file tree
Showing 24 changed files with 717 additions and 212 deletions.
49 changes: 39 additions & 10 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,23 @@ func rewriteTypesInExpr(expr string, rewrites DescRewriteMap) (string, error) {
if err != nil {
return "", err
}

ctx := tree.NewFmtCtx(tree.FmtSerializable)
ctx.SetIndexedTypeFormat(func(ctx *tree.FmtCtx, ref *tree.OIDTypeReference) {
newRef := ref
if rw, ok := rewrites[typedesc.UserDefinedTypeOIDToID(ref.OID)]; ok {
var id descpb.ID
id, err = typedesc.UserDefinedTypeOIDToID(ref.OID)
if err != nil {
return
}
if rw, ok := rewrites[id]; ok {
newRef = &tree.OIDTypeReference{OID: typedesc.TypeIDToOID(rw.ID)}
}
ctx.WriteString(newRef.SQLString())
})
if err != nil {
return "", err
}
ctx.FormatNode(parsed)
return ctx.CloseAndGetString(), nil
}
Expand Down Expand Up @@ -348,11 +357,15 @@ func allocateDescriptorRewrites(
// Ensure that all referenced types are present.
if col.Type.UserDefined() {
// TODO (rohany): This can be turned into an option later.
if _, ok := typesByID[typedesc.GetTypeDescID(col.Type)]; !ok {
id, err := typedesc.GetUserDefinedTypeDescID(col.Type)
if err != nil {
return nil, err
}
if _, ok := typesByID[id]; !ok {
return nil, errors.Errorf(
"cannot restore table %q without referenced type %d",
table.Name,
typedesc.GetTypeDescID(col.Type),
id,
)
}
}
Expand Down Expand Up @@ -1025,25 +1038,37 @@ func rewriteDatabaseDescs(databases []*dbdesc.Mutable, descriptorRewrites DescRe

// rewriteIDsInTypesT rewrites all ID's in the input types.T using the input
// ID rewrite mapping.
func rewriteIDsInTypesT(typ *types.T, descriptorRewrites DescRewriteMap) {
func rewriteIDsInTypesT(typ *types.T, descriptorRewrites DescRewriteMap) error {
if !typ.UserDefined() {
return
return nil
}
tid, err := typedesc.GetUserDefinedTypeDescID(typ)
if err != nil {
return err
}
// Collect potential new OID values.
var newOID, newArrayOID oid.Oid
if rw, ok := descriptorRewrites[typedesc.GetTypeDescID(typ)]; ok {
if rw, ok := descriptorRewrites[tid]; ok {
newOID = typedesc.TypeIDToOID(rw.ID)
}
if typ.Family() != types.ArrayFamily {
if rw, ok := descriptorRewrites[typedesc.GetArrayTypeDescID(typ)]; ok {
tid, err = typedesc.GetUserDefinedArrayTypeDescID(typ)
if err != nil {
return err
}
if rw, ok := descriptorRewrites[tid]; ok {
newArrayOID = typedesc.TypeIDToOID(rw.ID)
}
}
types.RemapUserDefinedTypeOIDs(typ, newOID, newArrayOID)
// If the type is an array, then we need to rewrite the element type as well.
if typ.Family() == types.ArrayFamily {
rewriteIDsInTypesT(typ.ArrayContents(), descriptorRewrites)
if err := rewriteIDsInTypesT(typ.ArrayContents(), descriptorRewrites); err != nil {
return err
}
}

return nil
}

// rewriteTypeDescs rewrites all ID's in the input slice of TypeDescriptors
Expand Down Expand Up @@ -1075,7 +1100,9 @@ func rewriteTypeDescs(types []*typedesc.Mutable, descriptorRewrites DescRewriteM
}
case descpb.TypeDescriptor_ALIAS:
// We need to rewrite any ID's present in the aliased types.T.
rewriteIDsInTypesT(typ.Alias, descriptorRewrites)
if err := rewriteIDsInTypesT(typ.Alias, descriptorRewrites); err != nil {
return err
}
default:
return errors.AssertionFailedf("unknown type kind %s", t.String())
}
Expand Down Expand Up @@ -1285,7 +1312,9 @@ func RewriteTableDescs(
// rewriteCol is a closure that performs the ID rewrite logic on a column.
rewriteCol := func(col *descpb.ColumnDescriptor) error {
// Rewrite the types.T's IDs present in the column.
rewriteIDsInTypesT(col.Type, descriptorRewrites)
if err := rewriteIDsInTypesT(col.Type, descriptorRewrites); err != nil {
return err
}
var newUsedSeqRefs []descpb.ID
for _, seqID := range col.UsesSequenceIds {
if rewrite, ok := descriptorRewrites[seqID]; ok {
Expand Down
Loading

0 comments on commit 9cf809c

Please sign in to comment.