Skip to content

Commit

Permalink
Benchmark framework + First memory fixes (#89)
Browse files Browse the repository at this point in the history
* feat(benchmarks): initial benchmark infrastructure

* fix(cidlink): mem allocations around link loading

* fix(deps): update to latest deps

use latest go-ipld-prime & go-ipld-prime-proto fixes

* fix(deps): remove unused badger code
  • Loading branch information
hannahhoward authored Aug 28, 2020
1 parent e98fd78 commit bd2d62f
Show file tree
Hide file tree
Showing 17 changed files with 1,055 additions and 39 deletions.
207 changes: 207 additions & 0 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package graphsync_test

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/benchmarks/testinstance"
tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
blockstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
delay "github.com/ipfs/go-ipfs-delay"
offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
)

const stdBlockSize = 8000

type runStats struct {
Time time.Duration
Name string
}

var benchmarkLog []runStats

func BenchmarkRoundtripSuccess(b *testing.B) {
ctx := context.Background()
tdm, err := newTempDirMaker(b)
require.NoError(b, err)
b.Run("test-20-10000", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000), tdm)
})
}

func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, d delay.D, bstoreLatency time.Duration, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
net := tn.VirtualNetwork(d)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
instances, err := ig.Instances(numnodes + b.N)
require.NoError(b, err)
destCids := df(ctx, b, instances[:numnodes])
// Set the blockstore latency on seed nodes
if bstoreLatency > 0 {
for _, i := range instances {
i.SetBlockstoreLatency(bstoreLatency)
}
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)

allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

runtime.GC()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
fetcher := instances[i+numnodes]
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
require.NoError(b, err)
start := time.Now()
for j := 0; j < numnodes; j++ {
instance := instances[j]
_, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: destCids[j]}, allSelector)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case err, ok := <-errChan:
if !ok {
return
}
b.Fatalf("received error on request: %s", err.Error())
}
}
}()
}
wg.Wait()
result := runStats{
Time: time.Since(start),
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, result)
cancel()
fetcher.Close()
}
testinstance.Close(instances)
ig.Close()

}

type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid

const unixfsChunkSize uint64 = 1 << 10
const unixfsLinksPerLevel = 1024

func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64) cid.Cid {

data := make([]byte, size)
_, err := rand.Read(data)
require.NoError(b, err)
buf := bytes.NewReader(data)
file := files.NewReaderFile(buf)

dagService := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))

// import to UnixFS
bufferedDS := ipldformat.NewBufferedDAG(ctx, dagService)

params := ihelper.DagBuilderParams{
Maxlinks: unixfsLinksPerLevel,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
}

db, err := params.New(chunker.NewSizeSplitter(file, int64(unixfsChunkSize)))
require.NoError(b, err, "unable to setup dag builder")

nd, err := balanced.Layout(db)
require.NoError(b, err, "unable to create unix fs node")

err = bufferedDS.Commit()
require.NoError(b, err, "unable to commit unix fs node")

return nd.Cid()
}

func allFilesUniformSize(size uint64) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size)
cids = append(cids, c)
}
return cids
}
}

type tempDirMaker struct {
tdm string
tempDirSeq int32
b *testing.B
}

var tempDirReplacer struct {
sync.Once
r *strings.Replacer
}

// Cribbed from https://github.com/golang/go/blob/master/src/testing/testing.go#L890
// and modified as needed due to https://github.com/golang/go/issues/41062
func newTempDirMaker(b *testing.B) (*tempDirMaker, error) {
c := &tempDirMaker{}
// ioutil.TempDir doesn't like path separators in its pattern,
// so mangle the name to accommodate subtests.
tempDirReplacer.Do(func() {
tempDirReplacer.r = strings.NewReplacer("/", "_", "\\", "_", ":", "_")
})
pattern := tempDirReplacer.r.Replace(b.Name())

var err error
c.tdm, err = ioutil.TempDir("", pattern)
if err != nil {
return nil, err
}
b.Cleanup(func() {
if err := os.RemoveAll(c.tdm); err != nil {
b.Errorf("TempDir RemoveAll cleanup: %v", err)
}
})
return c, nil
}

func (tdm *tempDirMaker) TempDir() string {
seq := atomic.AddInt32(&tdm.tempDirSeq, 1)
dir := fmt.Sprintf("%s%c%03d", tdm.tdm, os.PathSeparator, seq)
if err := os.Mkdir(dir, 0777); err != nil {
tdm.b.Fatalf("TempDir: %v", err)
}
return dir
}
169 changes: 169 additions & 0 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package testinstance

import (
"context"
"time"

"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
graphsync "github.com/ipfs/go-graphsync"
tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
gsimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
tnet "github.com/libp2p/go-libp2p-testing/net"
)

// TempDirGenerator is any interface that can generate temporary directories
type TempDirGenerator interface {
TempDir() string
}

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
// testnet
func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator {
ctx, cancel := context.WithCancel(ctx)
return InstanceGenerator{
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
gsOptions: gsOptions,
tempDirGenerator: tempDirGenerator,
}
}

// InstanceGenerator generates new test instances of bitswap+dependencies
type InstanceGenerator struct {
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
gsOptions []gsimpl.Option
tempDirGenerator TempDirGenerator
}

// Close closes the clobal context, shutting down all test instances
func (g *InstanceGenerator) Close() error {
g.cancel()
return nil // for Closer interface
}

// Next generates a new instance of graphsync + dependencies
func (g *InstanceGenerator) Next() (Instance, error) {
g.seq++
p, err := p2ptestutil.RandTestBogusIdentity()
if err != nil {
return Instance{}, err
}
return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir())
}

// Instances creates N test instances of bitswap + dependencies and connects
// them to each other
func (g *InstanceGenerator) Instances(n int) ([]Instance, error) {
var instances []Instance
for j := 0; j < n; j++ {
inst, err := g.Next()
if err != nil {
return nil, err
}
instances = append(instances, inst)
}
ConnectInstances(instances)
return instances, nil
}

// ConnectInstances connects the given instances to each other
func ConnectInstances(instances []Instance) {
for i, inst := range instances {
for j := i + 1; j < len(instances); j++ {
oinst := instances[j]
err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer)
if err != nil {
panic(err.Error())
}
}
}
}

// Close closes multiple instances at once
func Close(instances []Instance) error {
for _, i := range instances {
if err := i.Close(); err != nil {
return err
}
}
return nil
}

// Instance is a test instance of bitswap + dependencies for integration testing
type Instance struct {
Peer peer.ID
Loader ipld.Loader
Storer ipld.Storer
Exchange graphsync.GraphExchange
BlockStore blockstore.Blockstore
Adapter gsnet.GraphSyncNetwork
blockstoreDelay delay.D
ds datastore.Batching
}

// Close closes the associated datastore
func (i *Instance) Close() error {
return i.ds.Close()
}

// Blockstore returns the block store for this test instance
func (i *Instance) Blockstore() blockstore.Blockstore {
return i.BlockStore
}

// SetBlockstoreLatency customizes the artificial delay on receiving blocks
// from a blockstore test instance.
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
return i.blockstoreDelay.Set(t)
}

// NewInstance creates a test bitswap instance.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) {
bsdelay := delay.Fixed(0)

adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(dstore),
blockstore.DefaultCacheOpts())
if err != nil {
return Instance{}, err
}

loader := storeutil.LoaderForBlockstore(bstore)
storer := storeutil.StorerForBlockstore(bstore)
gs := gsimpl.New(ctx, adapter, loader, storer, gsOptions...)
gs.RegisterIncomingRequestHook(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})

return Instance{
Adapter: adapter,
Peer: p.ID(),
Exchange: gs,
Loader: loader,
Storer: storer,
BlockStore: bstore,
blockstoreDelay: bsdelay,
ds: dstore,
}, nil
}
16 changes: 16 additions & 0 deletions benchmarks/testnet/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package testnet

import (
gsnet "github.com/ipfs/go-graphsync/network"

"github.com/libp2p/go-libp2p-core/peer"
tnet "github.com/libp2p/go-libp2p-testing/net"
)

// Network is an interface for generating graphsync network interfaces
// based on a test network.
type Network interface {
Adapter(tnet.Identity) gsnet.GraphSyncNetwork
HasPeer(peer.ID) bool
}

Loading

0 comments on commit bd2d62f

Please sign in to comment.