Skip to content

Commit

Permalink
fix(restore): update the schema and type from 2103 (#7838)
Browse files Browse the repository at this point in the history
With #7810 change, we changed the format of the predicate. We missed updating the schema and predicate. This PR fixes it.
  • Loading branch information
NamanJain8 authored and Harshil Goel committed Jan 31, 2023
1 parent 7014ebc commit ab48a1f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
56 changes: 51 additions & 5 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type loadBackupInput struct {
restoreTs uint64
preds predicateSet
dropNs map[uint64]struct{}
isOld bool
version int
keepSchema bool
compression string
}
Expand Down Expand Up @@ -388,11 +388,57 @@ func (m *mapper) processReqCh(ctx context.Context) error {
kv.Value, err = update.Marshal()
return err
}
if in.isOld && parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
changeFormat := func() error {
// In the backup taken on 2103, we have the schemaUpdate.Predicate in format
// <namespace 8 bytes>|<attribute>. That had issues with JSON marshalling.
// So, we switched over to the format <namespace hex string>-<attribute>.
var err error
if parsedKey.IsSchema() {
var update pb.SchemaUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
if update.Predicate, err = x.AttrFrom2103(update.Predicate); err != nil {
return err
}
kv.Value, err = update.Marshal()
return err
}
if parsedKey.IsType() {
var update pb.TypeUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
if update.TypeName, err = x.AttrFrom2103(update.TypeName); err != nil {
return err
}
for _, sch := range update.Fields {
if sch.Predicate, err = x.AttrFrom2103(sch.Predicate); err != nil {
return err
}
}
kv.Value, err = update.Marshal()
return err
}
return nil
}
// We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have
// predicate stored within them, so they also need to be updated accordingly.
switch in.version {
case 0:
if parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
return nil
}
}
case 2103:
if err := changeFormat(); err != nil {
glog.Errorf("Unable to change format for: %+v Err=%+v", parsedKey, err)
return nil
}
default:
// for manifest versions >= 2015, do nothing.
}
// Reset the StreamId to prevent ordering issues while writing to stream writer.
kv.StreamId = 0
Expand Down Expand Up @@ -661,7 +707,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
in := &loadBackupInput{
preds: predSet,
dropNs: localDropNs,
isOld: manifest.Version == 0,
version: manifest.Version,
restoreTs: req.RestoreTs,
// Only map the schema keys corresponding to the latest backup.
keepSchema: i == 0,
Expand Down
13 changes: 13 additions & 0 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ const (
NsSeparator = "-"
)

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func AttrFrom2103(attr string) (string, error) {
if strings.ContainsRune(attr, replacementRune) {
return "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
ns, pred := binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:]
return NamespaceAttr(ns, pred), nil
}

func NamespaceToBytes(ns uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, ns)
Expand Down

0 comments on commit ab48a1f

Please sign in to comment.