Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

write blocks retrieved from the exchange to the blockstore #92

Merged
merged 9 commits into from
Jul 28, 2022
60 changes: 47 additions & 13 deletions blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"go.opentelemetry.io/otel/trace"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice/internal"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-verifcid"

"github.com/ipfs/go-blockservice/internal"
)

var logger = logging.Logger("blockservice")
Expand Down Expand Up @@ -84,7 +85,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// NewWriteThrough creates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
Expand Down Expand Up @@ -131,7 +132,6 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "blockService.AddBlock")
defer span.End()
Expand All @@ -155,8 +155,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.HasBlock(ctx, o); err != nil {
logger.Errorf("HasBlock: %s", err.Error())
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}

Expand Down Expand Up @@ -200,11 +200,9 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
}

if s.exchange != nil {
for _, o := range toput {
logger.Debugf("BlockService.BlockAdded %s", o.Cid())
if err := s.exchange.HasBlock(ctx, o); err != nil {
logger.Errorf("HasBlock: %s", err.Error())
}
logger.Debugf("BlockService.BlockAdded %d blocks", len(toput))
if err := s.exchange.NotifyNewBlocks(ctx, toput...); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
return nil
Expand Down Expand Up @@ -249,6 +247,11 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun
if err != nil {
return nil, err
}
// also write in the blockstore for caching
err = bs.Put(ctx, blk)
Jorropo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand Down Expand Up @@ -325,12 +328,43 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}

for b := range rblocks {
// batch available blocks together
batch := make([]blocks.Block, 0, 8)
batch = append(batch, b)
logger.Debugf("BlockService.BlockFetched %s", b.Cid())
select {
case out <- b:
case <-ctx.Done():

batchLoop:
for {
select {
case moreBlock, ok := <-rblocks:
if !ok {
// rblock has been closed, we set it to nil to avoid pulling zero values
rblocks = nil
} else {
logger.Debugf("BlockService.BlockFetched %s", moreBlock.Cid())
batch = append(batch, moreBlock)
}
case <-ctx.Done():
return
default:
break batchLoop
}
}

// also write in the blockstore for caching
err = bs.PutMany(ctx, batch)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

for _, b = range batch {
select {
case out <- b:
case <-ctx.Done():
return
}
}
}
}()
return out
Expand Down
71 changes: 66 additions & 5 deletions blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand All @@ -19,8 +20,8 @@ func TestWriteThroughWorks(t *testing.T) {
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(bstore2)
exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(exchbstore)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

Expand All @@ -44,6 +45,57 @@ func TestWriteThroughWorks(t *testing.T) {
}
}

func TestExchangeWrite(t *testing.T) {
bstore := &PutCountingBlockstore{
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(exchbstore)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

// GetBlock
block := bgen.Next()
err := exchbstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}
got, err := bserv.GetBlock(context.Background(), block.Cid())
if err != nil {
t.Fatal(err)
}
if got.Cid() != block.Cid() {
t.Fatalf("GetBlock returned unexpected block")
}
if bstore.PutCounter != 1 {
t.Fatalf("expected one Put call, have: %d", bstore.PutCounter)
}

// GetBlocks
b1 := bgen.Next()
err = exchbstore.Put(context.Background(), b1)
if err != nil {
t.Fatal(err)
}
b2 := bgen.Next()
err = exchbstore.Put(context.Background(), b2)
if err != nil {
t.Fatal(err)
}
bchan := bserv.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()})
var gotBlocks []blocks.Block
for b := range bchan {
gotBlocks = append(gotBlocks, b)
}
if len(gotBlocks) != 2 {
t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks))
}
if bstore.PutCounter != 3 {
t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter)
}
}

func TestLazySessionInitialization(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -53,8 +105,8 @@ func TestLazySessionInitialization(t *testing.T) {
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
session := offline.Exchange(bstore2)
exchange := offline.Exchange(bstore3)
sessionExch := &fakeSessionExchange{Interface: exchange, session: session}
exch := offline.Exchange(bstore3)
sessionExch := &fakeSessionExchange{Interface: exch, session: session}
bservSessEx := NewWriteThrough(bstore, sessionExch)
bgen := butil.NewBlockGenerator()

Expand All @@ -64,7 +116,11 @@ func TestLazySessionInitialization(t *testing.T) {
t.Fatal(err)
}
block2 := bgen.Next()
err = session.HasBlock(ctx, block2)
err = bstore2.Put(ctx, block2)
if err != nil {
t.Fatal(err)
}
err = session.NotifyNewBlocks(ctx, block2)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -107,6 +163,11 @@ func (bs *PutCountingBlockstore) Put(ctx context.Context, block blocks.Block) er
return bs.Blockstore.Put(ctx, block)
}

func (bs *PutCountingBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
bs.PutCounter += len(blocks)
return bs.Blockstore.PutMany(ctx, blocks)
}

var _ exchange.SessionExchange = (*fakeSessionExchange)(nil)

type fakeSessionExchange struct {
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ require (
go.opentelemetry.io/otel/trace v1.7.0
)

replace github.com/ipfs/go-ipfs-exchange-interface => github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171

replace github.com/ipfs/go-ipfs-exchange-offline => github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9

replace github.com/ipfs/go-bitswap => github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/btcsuite/btcd v0.21.0-beta // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e h1:LBNoZYGAIHUWfyolHrs9PH4M6kd3ze9MenhHdZXyHYc=
github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e/go.mod h1:EeOjh6Xi0IWQfdTj5LqScnXnxQtg9k4lshWjYCGSghc=
github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171 h1:J6IkkSKshHms3yQEOrNK/7B2YcCJ6ZbyDDmaXHwOj4Y=
github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y=
github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9 h1:R4PGpFAUOd3tJjFAJLu8dPyGOLufuICel3mZ9uwtYGo=
github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9/go.mod h1:6SxfEhzkc8ZX1yCWGL8owaravbq3xUF28yGF13VSQ7c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
Expand Down Expand Up @@ -249,8 +255,6 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/go-bitswap v0.6.0 h1:f2rc6GZtoSFhEIzQmddgGiel9xntj02Dg0ZNf2hSC+w=
github.com/ipfs/go-bitswap v0.6.0/go.mod h1:Hj3ZXdOC5wBJvENtdqsixmzzRukqd8EHLxZLZc3mzRA=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
github.com/ipfs/go-block-format v0.0.3 h1:r8t66QstRp/pd/or4dpnbVfXT5Gt7lOqRvC+/dDTpMc=
github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk=
Expand Down Expand Up @@ -289,10 +293,6 @@ github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q=
github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0 h1:TiMekCrOGQuWYtZO3mf4YJXDIdNgnKWZ9IE3fGlnWfo=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI=
github.com/ipfs/go-ipfs-exchange-offline v0.2.0 h1:2PF4o4A7W656rC0RxuhUace997FTcDTcIQ6NoEtyjAI=
github.com/ipfs/go-ipfs-exchange-offline v0.2.0/go.mod h1:HjwBeW0dvZvfOMwDP0TSKXIHf2s+ksdP4E3MLDRtLKY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-routing v0.2.1 h1:E+whHWhJkdN9YeoHZNj5itzc+OR292AJ2uE9FFiW0BY=
Expand Down