Skip to content

Commit

Permalink
integrate CIDv0
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Sep 5, 2016
1 parent 6fdfaaf commit 11c7561
Show file tree
Hide file tree
Showing 73 changed files with 889 additions and 1,009 deletions.
20 changes: 12 additions & 8 deletions assets/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"fmt"
"path/filepath"

"github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreunix"
uio "github.com/ipfs/go-ipfs/unixfs/io"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)

// initDocPaths lists the paths for the docs we want to seed during --init
Expand All @@ -25,7 +25,7 @@ var initDocPaths = []string{
}

// SeedInitDocs adds the list of embedded init documentation to the passed node, pins it and returns the root key
func SeedInitDocs(nd *core.IpfsNode) (*key.Key, error) {
func SeedInitDocs(nd *core.IpfsNode) (*cid.Cid, error) {
return addAssetList(nd, initDocPaths)
}

Expand All @@ -34,11 +34,11 @@ var initDirIndex = []string{
filepath.Join("..", "vendor", "dir-index-html-v1.0.0", "dir-index.html"),
}

func SeedInitDirIndex(nd *core.IpfsNode) (*key.Key, error) {
func SeedInitDirIndex(nd *core.IpfsNode) (*cid.Cid, error) {
return addAssetList(nd, initDirIndex)
}

func addAssetList(nd *core.IpfsNode, l []string) (*key.Key, error) {
func addAssetList(nd *core.IpfsNode, l []string) (*cid.Cid, error) {
dirb := uio.NewDirectory(nd.DAG)

for _, p := range l {
Expand All @@ -53,14 +53,18 @@ func addAssetList(nd *core.IpfsNode, l []string) (*key.Key, error) {
}

fname := filepath.Base(p)
k := key.B58KeyDecode(s)
if err := dirb.AddChild(nd.Context(), fname, k); err != nil {
c, err := cid.Decode(s)
if err != nil {
return nil, err
}

if err := dirb.AddChild(nd.Context(), fname, c); err != nil {
return nil, fmt.Errorf("assets: could not add '%s' as a child: %s", fname, err)
}
}

dir := dirb.GetNode()
dkey, err := nd.DAG.Add(dir)
dcid, err := nd.DAG.Add(dir)
if err != nil {
return nil, fmt.Errorf("assets: DAG.Add(dir) failed: %s", err)
}
Expand All @@ -73,5 +77,5 @@ func addAssetList(nd *core.IpfsNode, l []string) (*key.Key, error) {
return nil, fmt.Errorf("assets: Pinning flush failed: %s", err)
}

return &dkey, nil
return dcid, nil
}
10 changes: 8 additions & 2 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
"fmt"

key "github.com/ipfs/go-ipfs/blocks/key"

mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)

var ErrWrongHash = errors.New("data did not match given hash!")

type Block interface {
Multihash() mh.Multihash
Data() []byte
RawData() []byte
Key() key.Key
String() string
Loggable() map[string]interface{}
Expand Down Expand Up @@ -49,10 +51,14 @@ func (b *BasicBlock) Multihash() mh.Multihash {
return b.multihash
}

func (b *BasicBlock) Data() []byte {
func (b *BasicBlock) RawData() []byte {
return b.data
}

func (b *BasicBlock) Cid() *cid.Cid {
return cid.NewCidV0(b.multihash)
}

// Key returns the block's Multihash as a Key value.
func (b *BasicBlock) Key() key.Key {
return key.Key(b.multihash)
Expand Down
2 changes: 1 addition & 1 deletion blocks/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestData(t *testing.T) {
data := []byte("some data")
block := NewBlock(data)

if !bytes.Equal(block.Data(), data) {
if !bytes.Equal(block.RawData(), data) {
t.Error("data is wrong")
}
}
Expand Down
8 changes: 5 additions & 3 deletions blocks/blockstore/arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
good = append(good, block)
}
}
err := b.blockstore.PutMany(bs)

err := b.blockstore.PutMany(good)
if err != nil {
return err
}
for _, block := range bs {
b.arc.Add(block.Key(), true)

for _, blk := range good {
b.arc.Add(blk.Key(), true)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (bs *blockstore) Put(block blocks.Block) error {
if err == nil && exists {
return nil // already stored.
}
return bs.datastore.Put(k, block.Data())
return bs.datastore.Put(k, block.RawData())
}

func (bs *blockstore) PutMany(blocks []blocks.Block) error {
Expand All @@ -132,7 +132,7 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
continue
}

err = t.Put(k, b.Data())
err = t.Put(k, b.RawData())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestPutThenGetBlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.Data(), blockFromBlockstore.Data()) {
if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) {
t.Fail()
}
}
Expand Down
7 changes: 4 additions & 3 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
good = append(good, block)
}
}
err := b.blockstore.PutMany(bs)

err := b.blockstore.PutMany(good)
if err == nil {
for _, block := range bs {
b.bloom.AddTS([]byte(block.Key()))
for _, blk := range good {
b.bloom.AddTS([]byte(blk.Key()))
}
}
return err
Expand Down
87 changes: 58 additions & 29 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ package blockservice

import (
"errors"
"fmt"

blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
exchange "github.com/ipfs/go-ipfs/exchange"

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)

var log = logging.Logger("blockservice")
Expand All @@ -27,6 +30,12 @@ type BlockService struct {
Exchange exchange.Interface
}

// an Object is simply a typed block
type Object interface {
Cid() *cid.Cid
blocks.Block
}

// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
if rem == nil {
Expand All @@ -41,30 +50,41 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) {
k := b.Key()
has, err := s.Blockstore.Has(k)
func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
c := o.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
if err != nil {
return k, err
return nil, err
}

if has {
return k, nil
return c, nil
}

err = s.Blockstore.Put(b)
err = s.Blockstore.Put(o)
if err != nil {
return k, err
return nil, err
}
if err := s.Exchange.HasBlock(b); err != nil {
return "", errors.New("blockservice is closed")

if err := s.Exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}
return k, nil

return c, nil
}

func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
var toput []blocks.Block
var toputcids []*cid.Cid
for _, b := range bs {
has, err := s.Blockstore.Has(b.Key())
c := b.Cid()

has, err := s.Blockstore.Has(key.Key(c.Hash()))
if err != nil {
return nil, err
}
Expand All @@ -74,33 +94,33 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
}

toput = append(toput, b)
toputcids = append(toputcids, c)
}

err := s.Blockstore.PutMany(toput)
if err != nil {
return nil, err
}

var ks []key.Key
for _, b := range toput {
if err := s.Exchange.HasBlock(b); err != nil {
return nil, errors.New("blockservice is closed")
var ks []*cid.Cid
for _, o := range toput {
if err := s.Exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}
ks = append(ks, b.Key())

c := o.(Object).Cid() // cast is safe, we created these
ks = append(ks, c)
}
return ks, nil
}

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) {
if k == "" {
log.Debug("BlockService GetBlock: Nil Key")
return nil, ErrNotFound
}
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

log.Debugf("BlockService GetBlock: '%s'", k)
block, err := s.Blockstore.Get(k)
// TODO: blockstore shouldnt care about Cids, need an easier way to strip the abstraction
block, err := s.Blockstore.Get(key.Key(c.Hash()))
if err == nil {
return block, nil
}
Expand All @@ -109,7 +129,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, e
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, k)
blk, err := s.Exchange.GetBlock(ctx, key.Key(c.Hash()))
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -130,12 +150,13 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, e
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block {
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []key.Key
for _, k := range ks {
for _, c := range ks {
k := key.Key(c.Hash())
hit, err := s.Blockstore.Get(k)
if err != nil {
misses = append(misses, k)
Expand Down Expand Up @@ -171,11 +192,19 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan block
}

// DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(k key.Key) error {
return s.Blockstore.DeleteBlock(k)
func (s *BlockService) DeleteObject(o Object) error {
return s.Blockstore.DeleteBlock(o.Key())
}

func (s *BlockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.Exchange.Close()
}

type RawBlockObject struct {
blocks.Block
}

func (rob *RawBlockObject) Cid() *cid.Cid {
return cid.NewCidV0(rob.Block.Multihash())
}
Loading

0 comments on commit 11c7561

Please sign in to comment.