Skip to content

Commit

Permalink
add a ready channel to Snapshotter.Restore()
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Aug 31, 2020
1 parent 3906352 commit 659fcf5
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 11 deletions.
4 changes: 3 additions & 1 deletion server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (ms multiStore) Snapshot(height uint64, format uint32) (<-chan io.ReadClose
panic("not implemented")
}

func (ms multiStore) Restore(height uint64, format uint32, chunks <-chan io.ReadCloser) error {
func (ms multiStore) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
panic("not implemented")
}

Expand Down
11 changes: 9 additions & 2 deletions snapshots/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ type mockSnapshotter struct {
chunks [][]byte
}

func (m *mockSnapshotter) Restore(height uint64, format uint32, chunks <-chan io.ReadCloser) error {
func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
if format == 0 {
return types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
}
if ready != nil {
close(ready)
}

m.chunks = [][]byte{}
for reader := range chunks {
Expand Down Expand Up @@ -140,6 +145,8 @@ func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.Read
return ch, nil
}

func (m *hungSnapshotter) Restore(height uint64, format uint32, chunks <-chan io.ReadCloser) error {
func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
panic("not implemented")
}
6 changes: 3 additions & 3 deletions snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"io/ioutil"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/snapshots/types"
)
Expand Down Expand Up @@ -169,9 +168,10 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {

// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chReady := make(chan struct{}, 1)
chDone := make(chan restoreDone, 1)
go func() {
err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks)
err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks, chReady)
chDone <- restoreDone{
complete: err == nil,
err: err,
Expand All @@ -187,7 +187,7 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
return done.err
}
return errors.New("restore ended unexpectedly")
case <-time.After(20 * time.Millisecond):
case <-chReady:
}

m.chRestore = chChunks
Expand Down
4 changes: 3 additions & 1 deletion snapshots/types/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ type Snapshotter interface {
Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error)

// Restore restores a state snapshot, taking snapshot chunk readers as input.
Restore(height uint64, format uint32, chunks <-chan io.ReadCloser) error
// If the ready channel is non-nil, it returns a ready signal (by being closed) once the
// restorer is ready to accept chunks.
Restore(height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}) error
}
10 changes: 9 additions & 1 deletion store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ func (rs *Store) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, e
}

// Restore implements snapshottypes.Snapshotter.
func (rs *Store) Restore(height uint64, format uint32, chunks <-chan io.ReadCloser) error {
func (rs *Store) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
if format != snapshottypes.CurrentFormat {
return fmt.Errorf("%w %v", snapshottypes.ErrUnknownFormat, format)
}
Expand All @@ -670,6 +672,12 @@ func (rs *Store) Restore(height uint64, format uint32, chunks <-chan io.ReadClos
height, math.MaxInt64)
}

// Signal readiness. Must be done before the readers below are set up, since the zlib
// reader reads from the stream on initialization, potentially causing deadlocks.
if ready != nil {
close(ready)
}

// Set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
chunkReader := snapshots.NewChunkReader(chunks)
Expand Down
8 changes: 5 additions & 3 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func TestMultistoreRestore_Errors(t *testing.T) {
for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
err := store.Restore(tc.height, tc.format, nil)
err := store.Restore(tc.height, tc.format, nil, nil)
require.Error(t, err)
if tc.expectType != nil {
assert.True(t, errors.Is(err, tc.expectType))
Expand All @@ -610,8 +610,10 @@ func TestMultistoreSnapshotRestore(t *testing.T) {

chunks, err := source.Snapshot(version, snapshottypes.CurrentFormat)
require.NoError(t, err)
err = target.Restore(version, snapshottypes.CurrentFormat, chunks)
ready := make(chan struct{})
err = target.Restore(version, snapshottypes.CurrentFormat, chunks, ready)
require.NoError(t, err)
assert.EqualValues(t, struct{}{}, <-ready)

assert.Equal(t, source.LastCommitID(), target.LastCommitID())
for key, sourceStore := range source.stores {
Expand Down Expand Up @@ -687,7 +689,7 @@ func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys ui

chunks, err := source.Snapshot(version, snapshottypes.CurrentFormat)
require.NoError(b, err)
err = target.Restore(version, snapshottypes.CurrentFormat, chunks)
err = target.Restore(version, snapshottypes.CurrentFormat, chunks, nil)
require.NoError(b, err)
require.Equal(b, source.LastCommitID(), target.LastCommitID())
}
Expand Down

0 comments on commit 659fcf5

Please sign in to comment.