Skip to content

Commit

Permalink
Merge pull request #3348 from ipfs/kevina/gclocker
Browse files Browse the repository at this point in the history
Separate out the G.C. Locking from the Blockstore interface.
  • Loading branch information
whyrusleeping committed Nov 3, 2016
2 parents 9d132e7 + ffe9d7d commit 1ca2d42
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 15 deletions.
2 changes: 1 addition & 1 deletion blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var exampleBlock = blocks.NewBlock([]byte("foo"))

func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
Expand Down
34 changes: 28 additions & 6 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ type Blockstore interface {
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
}

type GCBlockstore interface {
Blockstore

type GCLocker interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
Expand All @@ -58,6 +56,20 @@ type GCBlockstore interface {
GCRequested() bool
}

type GCBlockstore interface {
Blockstore
GCLocker
}

func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
return gcBlockstore{bs, gcl}
}

type gcBlockstore struct {
Blockstore
GCLocker
}

func NewBlockstore(d ds.Batching) *blockstore {
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
Expand Down Expand Up @@ -223,6 +235,16 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
return output, nil
}

func NewGCLocker() *gclocker {
return &gclocker{}
}

type gclocker struct {
lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
}

type Unlocker interface {
Unlock()
}
Expand All @@ -236,18 +258,18 @@ func (u *unlocker) Unlock() {
u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
func (bs *gclocker) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return &unlocker{bs.lk.Unlock}
}

func (bs *blockstore) PinLock() Unlocker {
func (bs *gclocker) PinLock() Unlocker {
bs.lk.RLock()
return &unlocker{bs.lk.RUnlock}
}

func (bs *blockstore) GCRequested() bool {
func (bs *gclocker) GCRequested() bool {
return atomic.LoadInt32(&bs.gcreq) > 0
}
2 changes: 1 addition & 1 deletion blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) {
if ctx == nil {
ctx = context.TODO()
}
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func DefaultCacheOpts() CacheOpts {
}
}

func CachedBlockstore(bs GCBlockstore,
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
func CachedBlockstore(bs Blockstore,
ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) {
cbs = bs

if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
Expand Down
6 changes: 3 additions & 3 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ func TestWriteThroughWorks(t *testing.T) {
}
}

var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)
var _ blockstore.Blockstore = (*PutCountingBlockstore)(nil)

type PutCountingBlockstore struct {
blockstore.GCBlockstore
blockstore.Blockstore
PutCounter int
}

func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
bs.PutCounter++
return bs.GCBlockstore.Put(block)
return bs.Blockstore.Put(block)
}
4 changes: 3 additions & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
opts.HasBloomFilterSize = 0
}

n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts)
cbs, err := bstore.CachedBlockstore(bs, ctx, opts)
if err != nil {
return err
}

n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())

rcfg, err := n.Repo.Config()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion unixfs/mod/dagmodifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore) {
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blockstore) {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
Expand Down

0 comments on commit 1ca2d42

Please sign in to comment.