Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fixup! fix(arc): striped locking on last byte of CID
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed May 4, 2021
1 parent 72ade3e commit b34f205
Showing 1 changed file with 37 additions and 15 deletions.
52 changes: 37 additions & 15 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type cacheSize int
// to short-cut many searches without querying the underlying datastore.
type arccache struct {
cache *lru.TwoQueueCache
lks [256]sync.Mutex
lks [256]sync.RWMutex

blockstore Blockstore
viewer Viewer
Expand All @@ -36,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
if err != nil {
return nil, err
}
c := &arccache{cache: cache, lks: [256]sync.Mutex{}, blockstore: bs}
c := &arccache{cache: cache, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
if v, ok := bs.(Viewer); ok {
Expand All @@ -45,13 +45,27 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
return c, nil
}

func (b *arccache) rLock(k cid.Cid) func() {
b.lks[k.KeyString()[len(k.KeyString())-1]].RLock()
return func() {
b.lks[k.KeyString()[len(k.KeyString())-1]].RUnlock()
}
}

func (b *arccache) wLock(k cid.Cid) func() {
b.lks[k.KeyString()[len(k.KeyString())-1]].Lock()
return func() {
b.lks[k.KeyString()[len(k.KeyString())-1]].Unlock()
}
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.queryCache(k); ok && !has {
return nil
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()
unlock := b.wLock(k)
defer unlock()

b.cache.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
Expand All @@ -66,6 +80,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
return has, nil
}

unlock := b.rLock(k)
defer unlock()

has, err := b.blockstore.Has(k)
if err != nil {
return false, err
Expand All @@ -87,8 +104,8 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
// we have it but don't know the size, ask the datastore.
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()
unlock := b.rLock(k)
defer unlock()

blockSize, err := b.blockstore.GetSize(k)
if err == ErrNotFound {
Expand Down Expand Up @@ -121,6 +138,9 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
return ErrNotFound
}

unlock := b.rLock(k)
defer unlock()

return b.viewer.View(k, callback)
}

Expand All @@ -134,8 +154,8 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()
unlock := b.rLock(k)
defer unlock()

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
Expand All @@ -151,8 +171,8 @@ func (b *arccache) Put(bl blocks.Block) error {
return nil
}

b.lks[bl.Cid().Bytes()[len(bl.Cid().Bytes())-1]].Lock()
defer b.lks[bl.Cid().Bytes()[len(bl.Cid().Bytes())-1]].Unlock()
unlock := b.wLock(bl.Cid())
defer unlock()

err := b.blockstore.Put(bl)
if err == nil {
Expand All @@ -162,28 +182,30 @@ func (b *arccache) Put(bl blocks.Block) error {
}

func (b *arccache) PutMany(bs []blocks.Block) error {
mxs := [256]*sync.Mutex{}
var good []blocks.Block
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
b.lks[block.Cid().Bytes()[len(block.Cid().Bytes())-1]].Lock()
mxs[block.Cid().KeyString()[len(block.Cid().KeyString())-1]] = &sync.Mutex{}
}
}

defer func() {
for _, block := range good {
b.lks[block.Cid().Bytes()[len(block.Cid().Bytes())-1]].Unlock()
for _, mx := range mxs {
if mx != nil {
mx.Lock()
}
}()
}

err := b.blockstore.PutMany(good)
if err != nil {
return err
}
for _, block := range good {
b.cacheSize(block.Cid(), len(block.RawData()))
mxs[block.Cid().KeyString()[len(block.Cid().KeyString())-1]].Unlock()
}
return nil
}
Expand Down

0 comments on commit b34f205

Please sign in to comment.