diff --git a/blocks/blocksutil/block_generator.go b/blocks/blocksutil/block_generator.go index d70f794702a..438eac1a5da 100644 --- a/blocks/blocksutil/block_generator.go +++ b/blocks/blocksutil/block_generator.go @@ -10,13 +10,13 @@ type BlockGenerator struct { seq int } -func (bg *BlockGenerator) Next() blocks.Block { +func (bg *BlockGenerator) Next() *blocks.BasicBlock { bg.seq++ return blocks.NewBlock([]byte(string(bg.seq))) } -func (bg *BlockGenerator) Blocks(n int) []blocks.Block { - blocks := make([]blocks.Block, 0) +func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock { + blocks := make([]*blocks.BasicBlock, 0) for i := 0; i < n; i++ { b := bg.Next() blocks = append(blocks, b) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 3fb47aa0ba9..be84bb98ec5 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -4,6 +4,7 @@ package blockservice import ( + "context" "errors" "fmt" @@ -11,7 +12,6 @@ import ( "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" - context "context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) @@ -23,77 +23,112 @@ var ErrNotFound = errors.New("blockservice: key not found") // BlockService is a hybrid block datastore. It stores data in a local // datastore and may retrieve data from a remote Exchange. // It uses an internal `datastore.Datastore` instance to store values. -type BlockService struct { - // TODO don't expose underlying impl details - Blockstore blockstore.Blockstore - Exchange exchange.Interface +type BlockService interface { + Blockstore() blockstore.Blockstore + Exchange() exchange.Interface + AddBlock(o blocks.Block) (*cid.Cid, error) + AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) + GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) + GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block + DeleteBlock(o blocks.Block) error + Close() error +} + +type blockService struct { + blockstore blockstore.Blockstore + exchange exchange.Interface + // If checkFirst is true then first check that a block doesn't + // already exist to avoid republishing the block on the exchange. + checkFirst bool } // NewBlockService creates a BlockService with given datastore instance. -func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { +func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService { + if rem == nil { + log.Warning("blockservice running in local (offline) mode.") + } + + return &blockService{ + blockstore: bs, + exchange: rem, + checkFirst: true, + } +} + +// NewWriteThrough ceates a BlockService that guarantees writes will go +// through to the blockstore and are not skipped by cache checks. +func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService { if rem == nil { log.Warning("blockservice running in local (offline) mode.") } - return &BlockService{ - Blockstore: bs, - Exchange: rem, + return &blockService{ + blockstore: bs, + exchange: rem, + checkFirst: false, } } +func (bs *blockService) Blockstore() blockstore.Blockstore { + return bs.blockstore +} + +func (bs *blockService) Exchange() exchange.Interface { + return bs.exchange +} + // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. -func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) { - // TODO: while this is a great optimization, we should think about the - // possibility of streaming writes directly to disk. If we can pass this object - // all the way down to the datastore without having to 'buffer' its data, - // we could implement a `WriteTo` method on it that could do a streaming write - // of the content, saving us (probably) considerable memory. +func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { c := o.Cid() - has, err := s.Blockstore.Has(c) - if err != nil { - return nil, err - } + if s.checkFirst { + has, err := s.blockstore.Has(c) + if err != nil { + return nil, err + } - if has { - return c, nil + if has { + return c, nil + } } - err = s.Blockstore.Put(o) + err := s.blockstore.Put(o) if err != nil { return nil, err } - if err := s.Exchange.HasBlock(o); err != nil { + if err := s.exchange.HasBlock(o); err != nil { return nil, errors.New("blockservice is closed") } return c, nil } -func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { +func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { var toput []blocks.Block - for _, b := range bs { - has, err := s.Blockstore.Has(b.Cid()) - if err != nil { - return nil, err - } - - if has { - continue + if s.checkFirst { + for _, b := range bs { + has, err := s.blockstore.Has(b.Cid()) + if err != nil { + return nil, err + } + if has { + continue + } + toput = append(toput, b) } - - toput = append(toput, b) + } else { + toput = bs } - err := s.Blockstore.PutMany(toput) + err := s.blockstore.PutMany(toput) if err != nil { return nil, err } var ks []*cid.Cid for _, o := range toput { - if err := s.Exchange.HasBlock(o); err != nil { + if err := s.exchange.HasBlock(o); err != nil { return nil, fmt.Errorf("blockservice is closed (%s)", err) } @@ -104,19 +139,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { +func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", c) - block, err := s.Blockstore.Get(c) + block, err := s.blockstore.Get(c) if err == nil { return block, nil } - if err == blockstore.ErrNotFound && s.Exchange != nil { + if err == blockstore.ErrNotFound && s.exchange != nil { // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. log.Debug("Blockservice: Searching bitswap") - blk, err := s.Exchange.GetBlock(ctx, c) + blk, err := s.exchange.GetBlock(ctx, c) if err != nil { if err == blockstore.ErrNotFound { return nil, ErrNotFound @@ -137,13 +172,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. -func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { +func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []*cid.Cid for _, c := range ks { - hit, err := s.Blockstore.Get(c) + hit, err := s.blockstore.Get(c) if err != nil { misses = append(misses, c) continue @@ -160,7 +195,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc return } - rblocks, err := s.Exchange.GetBlocks(ctx, misses) + rblocks, err := s.exchange.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) return @@ -178,11 +213,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc } // DeleteBlock deletes a block in the blockservice from the datastore -func (s *BlockService) DeleteBlock(o blocks.Block) error { - return s.Blockstore.DeleteBlock(o.Cid()) +func (s *blockService) DeleteBlock(o blocks.Block) error { + return s.blockstore.DeleteBlock(o.Cid()) } -func (s *BlockService) Close() error { +func (s *blockService) Close() error { log.Debug("blockservice is shutting down...") - return s.Exchange.Close() + return s.exchange.Close() } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go new file mode 100644 index 00000000000..d87a383e566 --- /dev/null +++ b/blockservice/blockservice_test.go @@ -0,0 +1,49 @@ +package blockservice + +import ( + "testing" + + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/blocks/blockstore" + butil "github.com/ipfs/go-ipfs/blocks/blocksutil" + offline "github.com/ipfs/go-ipfs/exchange/offline" + + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" + dssync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" +) + +func TestWriteThroughWorks(t *testing.T) { + bstore := &PutCountingBlockstore{ + blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), + 0, + } + bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + exch := offline.Exchange(bstore2) + bserv := NewWriteThrough(bstore, exch) + bgen := butil.NewBlockGenerator() + + block := bgen.Next() + + t.Logf("PutCounter: %d", bstore.PutCounter) + bserv.AddBlock(block) + if bstore.PutCounter != 1 { + t.Fatalf("expected just one Put call, have: %d", bstore.PutCounter) + } + + bserv.AddBlock(block) + if bstore.PutCounter != 2 { + t.Fatal("Put should have called again, should be 2 is: %d", bstore.PutCounter) + } +} + +var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil) + +type PutCountingBlockstore struct { + blockstore.GCBlockstore + PutCounter int +} + +func (bs *PutCountingBlockstore) Put(block blocks.Block) error { + bs.PutCounter++ + return bs.GCBlockstore.Put(block) +} diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index 28e3a4e9912..622d1c8d689 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -9,13 +9,13 @@ import ( ) // Mocks returns |n| connected mock Blockservices -func Mocks(n int) []*BlockService { +func Mocks(n int) []BlockService { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) sg := bitswap.NewTestSessionGenerator(net) instances := sg.Instances(n) - var servs []*BlockService + var servs []BlockService for _, i := range instances { servs = append(servs, New(i.Blockstore(), i.Exchange)) } diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index 3317acf9d4e..9d5acf9ce75 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -8,13 +8,13 @@ import ( "os" "path" + context "context" assets "github.com/ipfs/go-ipfs/assets" cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" namesys "github.com/ipfs/go-ipfs/namesys" config "github.com/ipfs/go-ipfs/repo/config" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" - context "context" ) const ( diff --git a/core/core.go b/core/core.go index b4ee113c93e..6c1507abd21 100644 --- a/core/core.go +++ b/core/core.go @@ -96,7 +96,7 @@ type IpfsNode struct { // Services Peerstore pstore.Peerstore // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. + Blocks bserv.BlockService // the block service, get/add blocks. DAG merkledag.DAGService // the merkle dag service, get/add objects. Resolver *path.Resolver // the path resolution system Reporter metrics.Reporter diff --git a/core/core_test.go b/core/core_test.go index 86e8466756e..8f2414a5f5a 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -3,10 +3,10 @@ package core import ( "testing" + context "context" "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/testutil" - context "context" ) func TestInitialization(t *testing.T) { diff --git a/core/corerepo/stat.go b/core/corerepo/stat.go index 637195c65c6..26810344fa1 100644 --- a/core/corerepo/stat.go +++ b/core/corerepo/stat.go @@ -3,9 +3,9 @@ package corerepo import ( "fmt" + context "context" "github.com/ipfs/go-ipfs/core" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" - context "context" ) type Stat struct { diff --git a/core/coreunix/cat.go b/core/coreunix/cat.go index 7ca6de802ab..af3f43952c3 100644 --- a/core/coreunix/cat.go +++ b/core/coreunix/cat.go @@ -1,10 +1,10 @@ package coreunix import ( + context "context" core "github.com/ipfs/go-ipfs/core" path "github.com/ipfs/go-ipfs/path" uio "github.com/ipfs/go-ipfs/unixfs/io" - context "context" ) func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) { diff --git a/fuse/ipns/link_unix.go b/fuse/ipns/link_unix.go index 04da5a74f7d..0ba9157f757 100644 --- a/fuse/ipns/link_unix.go +++ b/fuse/ipns/link_unix.go @@ -5,9 +5,9 @@ package ipns import ( "os" + "context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs" - "context" ) type Link struct { diff --git a/fuse/node/mount_test.go b/fuse/node/mount_test.go index 2651d460504..5ecc9076186 100644 --- a/fuse/node/mount_test.go +++ b/fuse/node/mount_test.go @@ -9,13 +9,13 @@ import ( "testing" "time" + context "context" core "github.com/ipfs/go-ipfs/core" ipns "github.com/ipfs/go-ipfs/fuse/ipns" mount "github.com/ipfs/go-ipfs/fuse/mount" namesys "github.com/ipfs/go-ipfs/namesys" offroute "github.com/ipfs/go-ipfs/routing/offline" ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci" - context "context" ) func maybeSkipFuseTests(t *testing.T) { diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 0bec405a54a..bfde68214a6 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -3,10 +3,10 @@ package helpers import ( "fmt" + "context" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" - "context" ) // BlockSizeLimit specifies the maximum size an imported block can have. diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index b9431522b07..5d2cd36dcfc 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -41,7 +41,7 @@ type LinkService interface { GetOfflineLinkService() LinkService } -func NewDAGService(bs *bserv.BlockService) *dagService { +func NewDAGService(bs bserv.BlockService) *dagService { return &dagService{Blocks: bs} } @@ -51,7 +51,7 @@ func NewDAGService(bs *bserv.BlockService) *dagService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService + Blocks bserv.BlockService } // Add adds a node to the dagService, storing the block in the BlockService @@ -113,8 +113,8 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) } func (n *dagService) GetOfflineLinkService() LinkService { - if n.Blocks.Exchange.IsOnline() { - bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore)) + if n.Blocks.Exchange().IsOnline() { + bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore())) return NewDAGService(bsrv) } else { return n diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 006c8b5ca07..bc909353278 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -237,7 +237,7 @@ func TestFetchGraph(t *testing.T) { } // create an offline dagstore and ensure all blocks were fetched - bs := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore)) + bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore())) offline_ds := NewDAGService(bs)