Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/storage/netstore: add fetcher cancellation on shutdown #19049

Merged
merged 1 commit into from
Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 116 additions & 113 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,133 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
}

func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)
t.Helper()
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

cleanup = func() {
r.Close()
clean()
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

return r, cleanup, nil
},
})
defer sim.Close()
cleanup = func() {
r.Close()
clean()
}

log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
return r, cleanup, nil
},
})
defer sim.Close()

log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]

//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == pivot {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
return err
}
err = wait(ctx)
if err != nil {
return err
t.Fatal(err)
}

log.Debug("Waiting for kademlia")
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}
log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]

//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == pivot {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
if err != nil {
return err
}
err = wait(ctx)
if err != nil {
return err
}

//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
retErrC <- err
}()
log.Debug("Waiting for kademlia")
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}

disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
}()
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
retErrC <- err
}()

disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()

//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
success = false
}
//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
success = false
}

if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
return fmt.Errorf("requesting chunks: %v", err)
if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
return fmt.Errorf("requesting chunks: %v", err)
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}

func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
Expand Down
15 changes: 9 additions & 6 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
err := runRetrievalTest(*chunks, *nodes)
err := runRetrievalTest(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
Expand All @@ -93,10 +93,12 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
err := runRetrievalTest(c, n)
if err != nil {
t.Fatal(err)
}
t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
err := runRetrievalTest(t, c, n)
if err != nil {
t.Fatal(err)
}
})
}
}
}
Expand Down Expand Up @@ -225,7 +227,8 @@ simulation's `action` function.

The snapshot should have 'streamer' in its service list.
*/
func runRetrievalTest(chunkCount int, nodeCount int) error {
func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
t.Helper()
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()

Expand Down
20 changes: 19 additions & 1 deletion swarm/storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
func (n *NetStore) Close() {
close(n.closeC)
n.store.Close()
// TODO: loop through fetchers to cancel them

wg := sync.WaitGroup{}
for _, key := range n.fetchers.Keys() {
if f, ok := n.fetchers.Get(key); ok {
if fetch, ok := f.(*fetcher); ok {
wg.Add(1)
go func(fetch *fetcher) {
defer wg.Done()
fetch.cancel()

select {
case <-fetch.deliveredC:
case <-fetch.cancelledC:
}
}(fetch)
}
}
}
wg.Wait()
}

// get attempts at retrieving the chunk from LocalStore
Expand Down