Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a write-through block service #3294

Merged
merged 5 commits into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions blocks/blocksutil/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type BlockGenerator struct {
seq int
}

func (bg *BlockGenerator) Next() blocks.Block {
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
bg.seq++
return blocks.NewBlock([]byte(string(bg.seq)))
}

func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0)
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
Expand Down
129 changes: 82 additions & 47 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
package blockservice

import (
"context"
"errors"
"fmt"

blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"

context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
Expand All @@ -23,77 +23,112 @@ var ErrNotFound = errors.New("blockservice: key not found")
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
// TODO don't expose underlying impl details
Blockstore blockstore.Blockstore
Exchange exchange.Interface
type BlockService interface {
Blockstore() blockstore.Blockstore
Exchange() exchange.Interface
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
Close() error
}

type blockService struct {
blockstore blockstore.Blockstore
exchange exchange.Interface
// If checkFirst is true then first check that a block doesn't
// already exist to avoid republishing the block on the exchange.
checkFirst bool
}

// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &blockService{
blockstore: bs,
exchange: rem,
checkFirst: true,
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &BlockService{
Blockstore: bs,
Exchange: rem,
return &blockService{
blockstore: bs,
exchange: rem,
checkFirst: false,
}
}

func (bs *blockService) Blockstore() blockstore.Blockstore {
return bs.blockstore
}

func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
c := o.Cid()
has, err := s.Blockstore.Has(c)
if err != nil {
return nil, err
}
if s.checkFirst {
has, err := s.blockstore.Has(c)
if err != nil {
return nil, err
}

if has {
return c, nil
if has {
return c, nil
}
}

err = s.Blockstore.Put(o)
err := s.blockstore.Put(o)
if err != nil {
return nil, err
}

if err := s.Exchange.HasBlock(o); err != nil {
if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}

return c, nil
}

func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
var toput []blocks.Block
for _, b := range bs {
has, err := s.Blockstore.Has(b.Cid())
if err != nil {
return nil, err
}

if has {
continue
if s.checkFirst {
for _, b := range bs {
has, err := s.blockstore.Has(b.Cid())
if err != nil {
return nil, err
}
if has {
continue
}
toput = append(toput, b)
}

toput = append(toput, b)
} else {
toput = bs
}

err := s.Blockstore.PutMany(toput)
err := s.blockstore.PutMany(toput)
if err != nil {
return nil, err
}

var ks []*cid.Cid
for _, o := range toput {
if err := s.Exchange.HasBlock(o); err != nil {
if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}

Expand All @@ -104,19 +139,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.Blockstore.Get(c)
block, err := s.blockstore.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.Exchange != nil {
if err == blockstore.ErrNotFound && s.exchange != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, c)
blk, err := s.exchange.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -137,13 +172,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.Blockstore.Get(c)
hit, err := s.blockstore.Get(c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -160,7 +195,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.Exchange.GetBlocks(ctx, misses)
rblocks, err := s.exchange.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
Expand All @@ -178,11 +213,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
}

// DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(o blocks.Block) error {
return s.Blockstore.DeleteBlock(o.Cid())
func (s *blockService) DeleteBlock(o blocks.Block) error {
return s.blockstore.DeleteBlock(o.Cid())
}

func (s *BlockService) Close() error {
func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.Exchange.Close()
return s.exchange.Close()
}
49 changes: 49 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package blockservice

import (
"testing"

"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
butil "github.com/ipfs/go-ipfs/blocks/blocksutil"
offline "github.com/ipfs/go-ipfs/exchange/offline"

ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
dssync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func TestWriteThroughWorks(t *testing.T) {
bstore := &PutCountingBlockstore{
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(bstore2)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

block := bgen.Next()

t.Logf("PutCounter: %d", bstore.PutCounter)
bserv.AddBlock(block)
if bstore.PutCounter != 1 {
t.Fatalf("expected just one Put call, have: %d", bstore.PutCounter)
}

bserv.AddBlock(block)
if bstore.PutCounter != 2 {
t.Fatal("Put should have called again, should be 2 is: %d", bstore.PutCounter)
}
}

var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)

type PutCountingBlockstore struct {
blockstore.GCBlockstore
PutCounter int
}

func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
bs.PutCounter++
return bs.GCBlockstore.Put(block)
}
4 changes: 2 additions & 2 deletions blockservice/test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

// Mocks returns |n| connected mock Blockservices
func Mocks(n int) []*BlockService {
func Mocks(n int) []BlockService {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
sg := bitswap.NewTestSessionGenerator(net)

instances := sg.Instances(n)

var servs []*BlockService
var servs []BlockService
for _, i := range instances {
servs = append(servs, New(i.Blockstore(), i.Exchange))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"os"
"path"

context "context"
assets "github.com/ipfs/go-ipfs/assets"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
namesys "github.com/ipfs/go-ipfs/namesys"
config "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
context "context"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Expand Down
2 changes: 1 addition & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package core
import (
"testing"

context "context"
"github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
context "context"
)

func TestInitialization(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/corerepo/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package corerepo
import (
"fmt"

context "context"
"github.com/ipfs/go-ipfs/core"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
context "context"
)

type Stat struct {
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/cat.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package coreunix

import (
context "context"
core "github.com/ipfs/go-ipfs/core"
path "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"
context "context"
)

func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) {
Expand Down
2 changes: 1 addition & 1 deletion fuse/ipns/link_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package ipns
import (
"os"

"context"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"context"
)

type Link struct {
Expand Down
2 changes: 1 addition & 1 deletion fuse/node/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"testing"
"time"

context "context"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline"
ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci"
context "context"
)

func maybeSkipFuseTests(t *testing.T) {
Expand Down
Loading