Skip to content

Commit

Permalink
introduce restorer tests and adjust implementation accordingly
Browse files Browse the repository at this point in the history
  • Loading branch information
vroldanbet committed Dec 19, 2023
1 parent 37e193c commit 068d914
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 49 deletions.
12 changes: 6 additions & 6 deletions internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func openRestoreFile(filename string) (*os.File, int64, error) {
}

func backupRestoreCmdFunc(cmd *cobra.Command, args []string) error {
decoder, closer, err := decoderFromArgs(cmd, args)
decoder, closer, err := decoderFromArgs(args...)
if err != nil {
return err
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func backupRestoreCmdFunc(cmd *cobra.Command, args []string) error {
}

func backupParseSchemaCmdFunc(cmd *cobra.Command, out io.Writer, args []string) error {
decoder, closer, err := decoderFromArgs(cmd, args)
decoder, closer, err := decoderFromArgs(args...)
if err != nil {
return err
}
Expand Down Expand Up @@ -434,8 +434,8 @@ func backupParseSchemaCmdFunc(cmd *cobra.Command, out io.Writer, args []string)
return err
}

func backupParseRevisionCmdFunc(cmd *cobra.Command, out io.Writer, args []string) error {
decoder, closer, err := decoderFromArgs(cmd, args)
func backupParseRevisionCmdFunc(_ *cobra.Command, out io.Writer, args []string) error {
decoder, closer, err := decoderFromArgs(args...)
if err != nil {
return err
}
Expand All @@ -454,7 +454,7 @@ func backupParseRevisionCmdFunc(cmd *cobra.Command, out io.Writer, args []string

func backupParseRelsCmdFunc(cmd *cobra.Command, out io.Writer, args []string) error {
prefix := cobrautil.MustGetString(cmd, "prefix-filter")
decoder, closer, err := decoderFromArgs(cmd, args)
decoder, closer, err := decoderFromArgs(args...)
if err != nil {
return err
}
Expand All @@ -480,7 +480,7 @@ func backupParseRelsCmdFunc(cmd *cobra.Command, out io.Writer, args []string) er
return nil
}

func decoderFromArgs(_ *cobra.Command, args []string) (*backupformat.Decoder, io.Closer, error) {
func decoderFromArgs(args ...string) (*backupformat.Decoder, io.Closer, error) {
filename := "" // Default to stdin.
if len(args) > 0 {
filename = args[0]
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestBackupCreateCmdFunc(t *testing.T) {
err = backupCreateCmdFunc(cmd, []string{f})
require.NoError(t, err)

d, closer, err := decoderFromArgs(cmd, []string{f})
d, closer, err := decoderFromArgs(f)
require.NoError(t, err)
defer func() {
_ = d.Close()
Expand Down
111 changes: 69 additions & 42 deletions internal/cmd/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"github.com/authzed/zed/pkg/backupformat"
)

// FIXME temporary hack until a proper error is exposed from the API, specific to CRDB
var (
txConflictCodes = []string{"SQLSTATE 23505"}
retryableErrorCodes = []string{"retryable error"}
)

type restorer struct {
decoder *backupformat.Decoder
client client.Client
Expand All @@ -32,11 +38,14 @@ type restorer struct {
bar *progressbar.ProgressBar

// stats
relsWritten int64
batchesWritten int64
relsSkipped int64
duplicateRels int64
totalRetries int64
filteredOutRels int64
writtenRels int64
writtenBatches int64
skippedRels int64
skippedBatches int64
duplicateRels int64
duplicateBatches int64
totalRetries int64
}

func newRestorer(decoder *backupformat.Decoder, client client.Client, prefixFilter string, batchSize int,
Expand All @@ -59,7 +68,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
relationshipWriteStart := time.Now()
defer func() {
if err := r.bar.Finish(); err != nil {
log.Err(err).Msg("error finalizing progress bar")
log.Warn().Err(err).Msg("error finalizing progress bar")
}
}()

Expand All @@ -72,10 +81,12 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
batchesToBeCommitted := make([][]*v1.Relationship, 0, r.batchesPerTransaction)
for rel, err := r.decoder.Next(); rel != nil && err == nil; rel, err = r.decoder.Next() {
if err := ctx.Err(); err != nil {
r.bar.Describe("backup restore aborted")
return fmt.Errorf("aborted restore: %w", err)
}

if !hasRelPrefix(rel, r.prefixFilter) {
r.filteredOutRels++
continue
}

Expand All @@ -87,8 +98,6 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
Relationships: batch,
})
if err != nil {
r.totalRetries++

// It feels non-idiomatic to check for error and perform an operation, but in gRPC, when an element
// sent over the stream fails, we need to call recvAndClose() to get the error.
if err := r.commitStream(ctx, relationshipWriter, batchesToBeCommitted); err != nil {
Expand All @@ -106,12 +115,12 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
continue
}

// Reset the relationships in the batch. Do not reuse in case failure happens on subsequent batch in the tx
// The batch just sent is kept in batchesToBeCommitted, which is used for retries.
// Therefore, we cannot reuse the batch. Batches may fail on send, or on commit (CloseAndRecv).
batch = make([]*v1.Relationship, 0, r.batchSize)
r.batchesWritten++

// if we've sent the maximum number of batches per transaction, proceed to commit
if r.batchesWritten%r.batchesPerTransaction != 0 {
if int64(len(batchesToBeCommitted))%r.batchesPerTransaction != 0 {
continue
}

Expand All @@ -130,30 +139,32 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {

// Write the last batch
if len(batch) > 0 {
if err := relationshipWriter.Send(&v1.BulkImportRelationshipsRequest{
Relationships: batch,
}); err != nil {
return fmt.Errorf("error sending last batch to server: %w", err)
}
// Since we are going to close the stream anyway after the last batch, and given the actual error
// is only returned on CloseAndRecv(), we have to ignore the error here in order to get the actual
// underlying error that caused Send() to fail. It also gives us the opportunity to retry it
// in case it failed.
batchesToBeCommitted = append(batchesToBeCommitted, batch)
_ = relationshipWriter.Send(&v1.BulkImportRelationshipsRequest{Relationships: batch})
}

if err := r.commitStream(ctx, relationshipWriter, batchesToBeCommitted); err != nil {
return fmt.Errorf("error committing last set of batches: %w", err)
}

r.bar.Describe("complected import")
r.bar.Describe("completed import")
if err := r.bar.Finish(); err != nil {
log.Err(err).Msg("error finalizing progress bar")
log.Warn().Err(err).Msg("error finalizing progress bar")
}

totalTime := time.Since(relationshipWriteStart)
log.Info().
Int64("batches", r.batchesWritten).
Int64("relationships_loaded", r.relsWritten).
Int64("relationships_skipped", r.relsSkipped).
Int64("batches", r.writtenBatches).
Int64("relationships_loaded", r.writtenRels).
Int64("relationships_skipped", r.skippedRels).
Int64("duplicate_relationships", r.duplicateRels).
Int64("relationships_filtered_out", r.filteredOutRels).
Int64("retried_errors", r.totalRetries).
Uint64("perSecond", perSec(uint64(r.relsWritten), totalTime)).
Uint64("perSecond", perSec(uint64(r.writtenRels+r.skippedRels), totalTime)).
Stringer("duration", totalTime).
Msg("finished restore")
return nil
Expand Down Expand Up @@ -183,54 +194,63 @@ func (r *restorer) commitStream(ctx context.Context, bulkImportClient v1.Experim
case retryable && r.disableRetryErrors:
return err
case conflict && r.skipOnConflicts:
r.relsSkipped += int64(expectedLoaded)
r.skippedRels += int64(expectedLoaded)
r.skippedBatches += int64(len(batchesToBeCommitted))
r.duplicateBatches += int64(len(batchesToBeCommitted))
r.duplicateRels += int64(expectedLoaded)
numLoaded = expectedLoaded
r.bar.Describe("skipping conflicting batch")
case conflict && r.touchOnConflicts:
r.bar.Describe("retrying conflicting batch")
r.duplicateRels += int64(expectedLoaded)
r.duplicateBatches += int64(len(batchesToBeCommitted))
r.totalRetries++
numLoaded, retries, err = r.writeBatchesWithRetry(ctx, batchesToBeCommitted)
if err != nil {
return fmt.Errorf("failed to write retried batch: %w", err)
}
case conflict && !r.touchOnConflicts:

retries++ // account for the initial attempt
r.writtenBatches += int64(len(batchesToBeCommitted))
r.writtenRels += int64(numLoaded)
case conflict && (!r.touchOnConflicts && !r.skipOnConflicts):
r.bar.Describe("conflict detected, aborting restore")
return fmt.Errorf("duplicate relationships found")
case retryable:
r.bar.Describe("retrying after error")
r.totalRetries++
numLoaded, retries, err = r.writeBatchesWithRetry(ctx, batchesToBeCommitted)
if err != nil {
return fmt.Errorf("failed to write retried batch: %w", err)
}

retries++ // account for the initial attempt
r.writtenBatches += int64(len(batchesToBeCommitted))
r.writtenRels += int64(numLoaded)
default:
r.bar.Describe("restoring from backup")
r.writtenBatches += int64(len(batchesToBeCommitted))
}

// it was a successful transaction commit without duplicates
if resp != nil {
numLoaded = resp.NumLoaded

var expected uint64
for _, b := range batchesToBeCommitted {
expected += uint64(len(b))
}

if expected != numLoaded {
log.Warn().Uint64("loaded", numLoaded).Uint64("expected", expected).Msg("unexpected number of relationships loaded")
r.writtenRels += int64(resp.NumLoaded)
if expectedLoaded != resp.NumLoaded {
log.Warn().Uint64("loaded", resp.NumLoaded).Uint64("expected", expectedLoaded).Msg("unexpected number of relationships loaded")
}
}

r.relsWritten += int64(numLoaded)
if err := r.bar.Set64(r.relsWritten); err != nil {
if err := r.bar.Set64(r.writtenRels + r.skippedRels); err != nil {
return fmt.Errorf("error incrementing progress bar: %w", err)
}

if !isatty.IsTerminal(os.Stderr.Fd()) {
log.Trace().
Int64("batches_written", r.batchesWritten).
Int64("relationships_written", r.relsWritten).
Int64("batches_written", r.writtenBatches).
Int64("relationships_written", r.writtenRels).
Int64("duplicate_batches", r.duplicateBatches).
Int64("duplicate_relationships", r.duplicateRels).
Int64("skipped_batches", r.skippedBatches).
Int64("skipped_relationships", r.skippedRels).
Uint64("retries", retries).
Msg("restore progress")
}
Expand Down Expand Up @@ -292,17 +312,24 @@ func isAlreadyExistsError(err error) bool {
}
}

// FIXME temporary hack until a proper error is exposed from the API, specific to CRDB
return strings.Contains(err.Error(), "SQLSTATE 23505")
for _, code := range txConflictCodes {
if strings.Contains(err.Error(), code) {
return true
}
}

return false
}

func isRetryableError(err error) bool {
if err == nil {
return false
}

if strings.Contains(err.Error(), "RETRY_SERIALIZABLE") { // FIXME hack until SpiceDB exposes proper typed err
return true
for _, code := range retryableErrorCodes {
if strings.Contains(err.Error(), code) {
return true
}
}

return false
Expand Down
Loading

0 comments on commit 068d914

Please sign in to comment.