From 27a4b690edea2c5c99159dfc59e7652ee8f3e094 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 07:34:56 +0100 Subject: [PATCH] blockservice: move to single unique struct and add `WithContentBlocker` option The idea is to have a THE blockservice object, this means we wont ever have an issue where multiple competitive features play poorly with each other due to how nested blockservices could do. Let's say we add multi-exchange based on content routing, do we want to have to handle the mess this could create with nested blockservices ? It implements features for https://github.com/ipfs-shipyard/nopfs/issues/34. --- CHANGELOG.md | 1 + blockservice/blockservice.go | 176 ++++++++++++++------------- blockservice/blockservice_test.go | 84 +++++++++++-- blockservice/test/mock.go | 4 +- fetcher/impl/blockservice/fetcher.go | 6 +- gateway/blocks_backend.go | 6 +- ipld/merkledag/merkledag.go | 6 +- ipld/merkledag/test/utils.go | 2 +- 8 files changed, 182 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12ad5df7e..2328d9d25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The following emojis are used to highlight certain changes: - `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. +- `blockservice` now has `WithContentBlocker` option which allows to filter Add and Get requests by CID. ### Changed diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 353be00f8..e9772928c 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -5,7 +5,6 @@ package blockservice import ( "context" - "io" "sync" "go.opentelemetry.io/otel/attribute" @@ -40,41 +39,15 @@ type BlockGetter interface { GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block } +// Blocker returns err != nil if the CID is disallowed to be fetched or stored in blockservice. +// It returns an error so error messages could be passed. +type Blocker func(cid.Cid) error + // BlockService is a hybrid block datastore. It stores data in a local // datastore and may retrieve data from a remote Exchange. -// It uses an internal `datastore.Datastore` instance to store values. -type BlockService interface { - io.Closer - BlockGetter - - // Blockstore returns a reference to the underlying blockstore - Blockstore() blockstore.Blockstore - - // Exchange returns a reference to the underlying exchange (usually bitswap) - Exchange() exchange.Interface - - // AddBlock puts a given block to the underlying datastore - AddBlock(ctx context.Context, o blocks.Block) error - - // AddBlocks adds a slice of blocks at the same time using batching - // capabilities of the underlying datastore whenever possible. - AddBlocks(ctx context.Context, bs []blocks.Block) error - - // DeleteBlock deletes the given block from the blockservice. - DeleteBlock(ctx context.Context, o cid.Cid) error -} - -// BoundedBlockService is a Blockservice bounded via strict multihash Allowlist. -type BoundedBlockService interface { - BlockService - - Allowlist() verifcid.Allowlist -} - -var _ BoundedBlockService = (*blockService)(nil) - -type blockService struct { +type BlockService struct { allowlist verifcid.Allowlist + blocker Blocker blockstore blockstore.Blockstore exchange exchange.Interface // If checkFirst is true then first check that a block doesn't @@ -82,30 +55,37 @@ type blockService struct { checkFirst bool } -type Option func(*blockService) +type Option func(*BlockService) // WriteThrough disable cache checks for writes and make them go straight to // the blockstore. func WriteThrough() Option { - return func(bs *blockService) { + return func(bs *BlockService) { bs.checkFirst = false } } // WithAllowlist sets a custom [verifcid.Allowlist] which will be used func WithAllowlist(allowlist verifcid.Allowlist) Option { - return func(bs *blockService) { + return func(bs *BlockService) { bs.allowlist = allowlist } } +// WithContentBlocker allows to filter what blocks can be fetched or added to the blockservice. +func WithContentBlocker(blocker Blocker) Option { + return func(bs *BlockService) { + bs.blocker = blocker + } +} + // New creates a BlockService with given datastore instance. -func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService { +func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) *BlockService { if exchange == nil { logger.Debug("blockservice running in local (offline) mode.") } - service := &blockService{ + service := &BlockService{ allowlist: verifcid.DefaultAllowlist, blockstore: bs, exchange: exchange, @@ -120,41 +100,45 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) } // Blockstore returns the blockstore behind this blockservice. -func (s *blockService) Blockstore() blockstore.Blockstore { +func (s *BlockService) Blockstore() blockstore.Blockstore { return s.blockstore } // Exchange returns the exchange behind this blockservice. -func (s *blockService) Exchange() exchange.Interface { +func (s *BlockService) Exchange() exchange.Interface { return s.exchange } -func (s *blockService) Allowlist() verifcid.Allowlist { +func (s *BlockService) Allowlist() verifcid.Allowlist { return s.allowlist } +func (s *BlockService) Blocker() Blocker { + return s.blocker +} + // NewSession creates a new session that allows for // controlled exchange of wantlists to decrease the bandwidth overhead. // If the current exchange is a SessionExchange, a new exchange // session will be created. Otherwise, the current exchange will be used // directly. // Sessions are lazily setup, this is cheap. -func NewSession(ctx context.Context, bs BlockService) *Session { - ses := grabSessionFromContext(ctx, bs) +func (s *BlockService) NewSession(ctx context.Context) *Session { + ses := grabSessionFromContext(ctx, s) if ses != nil { return ses } - return newSession(ctx, bs) + return s.newSession(ctx) } // newSession is like [NewSession] but it does not attempt to reuse session from the existing context. -func newSession(ctx context.Context, bs BlockService) *Session { - return &Session{bs: bs, sesctx: ctx} +func (s *BlockService) newSession(ctx context.Context) *Session { + return &Session{bs: s, sesctx: ctx} } // AddBlock adds a particular block to the service, Putting it into the datastore. -func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { +func (s *BlockService) AddBlock(ctx context.Context, o blocks.Block) error { ctx, span := internal.StartSpan(ctx, "blockService.AddBlock") defer span.End() @@ -163,6 +147,13 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { if err != nil { return err } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + return err + } + } + if s.checkFirst { if has, err := s.blockstore.Has(ctx, c); has || err != nil { return err @@ -184,16 +175,23 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { return nil } -func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { +func (s *BlockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { ctx, span := internal.StartSpan(ctx, "blockService.AddBlocks") defer span.End() // hash security for _, b := range bs { - err := verifcid.ValidateCid(s.allowlist, b.Cid()) + c := b.Cid() + err := verifcid.ValidateCid(s.allowlist, c) if err != nil { return err } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + return err + } + } } var toput []blocks.Block if s.checkFirst { @@ -231,7 +229,7 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { +func (s *BlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { if ses := grabSessionFromContext(ctx, s); ses != nil { return ses.GetBlock(ctx, c) } @@ -239,21 +237,27 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s, s.getExchangeFetcher) + return s.getBlock(ctx, c, s.getExchangeFetcher) } // Look at what I have to do, no interface covariance :'( -func (s *blockService) getExchangeFetcher() exchange.Fetcher { +func (s *BlockService) getExchangeFetcher() exchange.Fetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { - err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security +func (s *BlockService) getBlock(ctx context.Context, c cid.Cid, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { + err := verifcid.ValidateCid(s.allowlist, c) // hash security if err != nil { return nil, err } - blockstore := bs.Blockstore() + if s.blocker != nil { + if err := s.blocker(c); err != nil { + return nil, err + } + } + + blockstore := s.Blockstore() block, err := blockstore.Get(ctx, c) switch { @@ -281,7 +285,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func if err != nil { return nil, err } - if ex := bs.Exchange(); ex != nil { + if ex := s.Exchange(); ex != nil { err = ex.NotifyNewBlocks(ctx, blk) if err != nil { return nil, err @@ -294,7 +298,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. -func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { +func (s *BlockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { if ses := grabSessionFromContext(ctx, s); ses != nil { return ses.GetBlocks(ctx, ks) } @@ -302,23 +306,27 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s, s.getExchangeFetcher) + return s.getBlocks(ctx, ks, s.getExchangeFetcher) } -func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { +func (s *BlockService) getBlocks(ctx context.Context, ks []cid.Cid, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { defer close(out) - allowlist := grabAllowlistFromBlockservice(blockservice) - var lastAllValidIndex int var c cid.Cid for lastAllValidIndex, c = range ks { - if err := verifcid.ValidateCid(allowlist, c); err != nil { + if err := verifcid.ValidateCid(s.allowlist, c); err != nil { break } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + break + } + } } if lastAllValidIndex != len(ks) { @@ -327,16 +335,24 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements // hash security - if err := verifcid.ValidateCid(allowlist, c); err == nil { - ks2 = append(ks2, c) - } else { + if err := verifcid.ValidateCid(s.allowlist, c); err != nil { logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err) + continue + } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + logger.Errorf("blocked CID (%s) passed to blockService.GetBlocks: %s", c, err) + continue + } } + + ks2 = append(ks2, c) } ks = ks2 } - bs := blockservice.Blockstore() + bs := s.Blockstore() var misses []cid.Cid for _, c := range ks { @@ -363,7 +379,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet return } - ex := blockservice.Exchange() + ex := s.Exchange() var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -406,7 +422,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } // DeleteBlock deletes a block in the blockservice from the datastore -func (s *blockService) DeleteBlock(ctx context.Context, c cid.Cid) error { +func (s *BlockService) DeleteBlock(ctx context.Context, c cid.Cid) error { ctx, span := internal.StartSpan(ctx, "blockService.DeleteBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() @@ -417,7 +433,7 @@ func (s *blockService) DeleteBlock(ctx context.Context, c cid.Cid) error { return err } -func (s *blockService) Close() error { +func (s *BlockService) Close() error { logger.Debug("blockservice is shutting down...") if s.exchange == nil { return nil @@ -428,7 +444,7 @@ func (s *blockService) Close() error { // Session is a helper type to provide higher level access to bitswap sessions type Session struct { createSession sync.Once - bs BlockService + bs *BlockService ses exchange.Fetcher sesctx context.Context } @@ -461,7 +477,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s.bs, s.grabSession) + return s.bs.getBlock(ctx, c, s.grabSession) } // GetBlocks gets blocks in the context of a request session @@ -469,7 +485,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s.bs, s.grabSession) + return s.bs.getBlocks(ctx, ks, s.grabSession) } var _ BlockGetter = (*Session)(nil) @@ -479,11 +495,11 @@ var _ BlockGetter = (*Session)(nil) // will be redirected to this same session instead. // Sessions are lazily setup, this is cheap. // It wont make a new session if one exists already in the context. -func ContextWithSession(ctx context.Context, bs BlockService) context.Context { - if grabSessionFromContext(ctx, bs) != nil { +func (s *BlockService) ContextWithSession(ctx context.Context) context.Context { + if grabSessionFromContext(ctx, s) != nil { return ctx } - return EmbedSessionInContext(ctx, newSession(ctx, bs)) + return EmbedSessionInContext(ctx, s.newSession(ctx)) } // EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. @@ -496,7 +512,7 @@ func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { // This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, // if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. // By having this private we allow consumers to follow the trace of where the blockservice is passed and used. -func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { +func grabSessionFromContext(ctx context.Context, bs *BlockService) *Session { s := ctx.Value(bs) if s == nil { return nil @@ -510,11 +526,3 @@ func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { return ss } - -// grabAllowlistFromBlockservice never returns nil -func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { - if bbs, ok := bs.(BoundedBlockService); ok { - return bbs.Allowlist() - } - return verifcid.DefaultAllowlist -} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 53fd725f3..16dd92940 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -2,6 +2,7 @@ package blockservice import ( "context" + "errors" "testing" blockstore "github.com/ipfs/boxo/blockstore" @@ -67,7 +68,7 @@ func TestExchangeWrite(t *testing.T) { for name, fetcher := range map[string]BlockGetter{ "blockservice": bserv, - "session": NewSession(context.Background(), bserv), + "session": bserv.NewSession(context.Background()), } { t.Run(name, func(t *testing.T) { // GetBlock @@ -154,7 +155,7 @@ func TestLazySessionInitialization(t *testing.T) { t.Fatal(err) } - bsession := NewSession(ctx, bservSessEx) + bsession := bservSessEx.NewSession(ctx) if bsession.ses != nil { t.Fatal("Session exchange should not instantiated session immediately") } @@ -235,7 +236,7 @@ func TestNilExchange(t *testing.T) { bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bserv := New(bs, nil, WriteThrough()) - sess := NewSession(ctx, bserv) + sess := bserv.NewSession(ctx) _, err := sess.GetBlock(ctx, block.Cid()) if !ipld.IsNotFound(err) { t.Fatal("expected block to not be found") @@ -286,7 +287,7 @@ func TestAllowlist(t *testing.T) { blockservice := New(bs, nil, WithAllowlist(verifcid.NewAllowlist(map[uint64]bool{multihash.BLAKE3: true}))) check(blockservice.GetBlock) - check(NewSession(ctx, blockservice).GetBlock) + check(blockservice.NewSession(ctx).GetBlock) } type fakeIsNewSessionCreateExchange struct { @@ -335,7 +336,7 @@ func TestContextSession(t *testing.T) { service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) - ctx = ContextWithSession(ctx, service) + ctx = service.ContextWithSession(ctx) b, err := service.GetBlock(ctx, block1.Cid()) a.NoError(err) @@ -348,8 +349,77 @@ func TestContextSession(t *testing.T) { a.False(sesEx.newSessionWasCalled, "session should be reused in context") a.Equal( - NewSession(ctx, service), - NewSession(ContextWithSession(ctx, service), service), + service.NewSession(ctx), + service.NewSession(service.ContextWithSession(ctx)), "session must be deduped in all invocations on the same context", ) } + +func TestBlocker(t *testing.T) { + t.Parallel() + a := assert.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bgen := butil.NewBlockGenerator() + allowed := bgen.Next() + notAllowed := bgen.Next() + + var disallowed = errors.New("disallowed") + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + service := New(bs, nil, WithContentBlocker(func(c cid.Cid) error { + if c == notAllowed.Cid() { + return disallowed + } + return nil + })) + + // try putting + a.NoError(service.AddBlock(ctx, allowed)) + has, err := bs.Has(ctx, allowed.Cid()) + a.NoError(err) + a.True(has, "block was not added even tho it is not blocked") + a.NoError(service.DeleteBlock(ctx, allowed.Cid())) + + a.ErrorIs(service.AddBlock(ctx, notAllowed), disallowed) + has, err = bs.Has(ctx, notAllowed.Cid()) + a.NoError(err) + a.False(has, "block was added even tho it is blocked") + + a.NoError(service.AddBlocks(ctx, []blocks.Block{allowed})) + has, err = bs.Has(ctx, allowed.Cid()) + a.NoError(err) + a.True(has, "block was not added even tho it is not blocked") + a.NoError(service.DeleteBlock(ctx, allowed.Cid())) + + a.ErrorIs(service.AddBlocks(ctx, []blocks.Block{notAllowed}), disallowed) + has, err = bs.Has(ctx, notAllowed.Cid()) + a.NoError(err) + a.False(has, "block was added even tho it is blocked") + + // now try fetch + a.NoError(bs.Put(ctx, allowed)) + a.NoError(bs.Put(ctx, notAllowed)) + + block, err := service.GetBlock(ctx, allowed.Cid()) + a.NoError(err) + a.Equal(block.RawData(), allowed.RawData()) + + _, err = service.GetBlock(ctx, notAllowed.Cid()) + a.ErrorIs(err, disallowed) + + var gotAllowed bool + for block := range service.GetBlocks(ctx, []cid.Cid{allowed.Cid(), notAllowed.Cid()}) { + switch block.Cid() { + case allowed.Cid(): + gotAllowed = true + case notAllowed.Cid(): + t.Error("got disallowed block") + default: + t.Fatalf("got unrelated block: %s", block.Cid()) + } + } + a.True(gotAllowed, "did not got allowed block") +} diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index e32b10b99..e33551f4c 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -9,13 +9,13 @@ import ( ) // Mocks returns |n| connected mock Blockservices -func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { +func Mocks(n int, opts ...blockservice.Option) []*blockservice.BlockService { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) sg := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := sg.Instances(n) - var servs []blockservice.BlockService + var servs []*blockservice.BlockService for _, i := range instances { servs = append(servs, blockservice.New(i.Blockstore(), i.Exchange, opts...)) } diff --git a/fetcher/impl/blockservice/fetcher.go b/fetcher/impl/blockservice/fetcher.go index a02e6ebbf..010bb2b28 100644 --- a/fetcher/impl/blockservice/fetcher.go +++ b/fetcher/impl/blockservice/fetcher.go @@ -23,13 +23,13 @@ type fetcherSession struct { // FetcherConfig defines a configuration object from which Fetcher instances are constructed type FetcherConfig struct { - blockService blockservice.BlockService + blockService *blockservice.BlockService NodeReifier ipld.NodeReifier PrototypeChooser traversal.LinkTargetNodePrototypeChooser } // NewFetcherConfig creates a FetchConfig from which session may be created and nodes retrieved. -func NewFetcherConfig(blockService blockservice.BlockService) FetcherConfig { +func NewFetcherConfig(blockService *blockservice.BlockService) FetcherConfig { return FetcherConfig{ blockService: blockService, PrototypeChooser: DefaultPrototypeChooser, @@ -39,7 +39,7 @@ func NewFetcherConfig(blockService blockservice.BlockService) FetcherConfig { // NewSession creates a session from which nodes may be retrieved. // The session ends when the provided context is canceled. func (fc FetcherConfig) NewSession(ctx context.Context) fetcher.Fetcher { - return fc.FetcherWithSession(ctx, blockservice.NewSession(ctx, fc.blockService)) + return fc.FetcherWithSession(ctx, fc.blockService.NewSession(ctx)) } func (fc FetcherConfig) FetcherWithSession(ctx context.Context, s *blockservice.Session) fetcher.Fetcher { diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index fe188ae71..bb76eff66 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -52,7 +52,7 @@ import ( // BlocksBackend is an [IPFSBackend] implementation based on a [blockservice.BlockService]. type BlocksBackend struct { blockStore blockstore.Blockstore - blockService blockservice.BlockService + blockService *blockservice.BlockService dagService format.DAGService resolver resolver.Resolver @@ -97,7 +97,7 @@ func WithResolver(r resolver.Resolver) BlocksBackendOption { type BlocksBackendOption func(options *blocksBackendOptions) error -func NewBlocksBackend(blockService blockservice.BlockService, opts ...BlocksBackendOption) (*BlocksBackend, error) { +func NewBlocksBackend(blockService *blockservice.BlockService, opts ...BlocksBackendOption) (*BlocksBackend, error) { var compiledOptions blocksBackendOptions for _, o := range opts { if err := o(&compiledOptions); err != nil { @@ -687,7 +687,7 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { var _ WithContextHint = (*BlocksBackend)(nil) func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { - return blockservice.ContextWithSession(ctx, bb.blockService) + return bb.blockService.ContextWithSession(ctx) } func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index a227780ff..1581279d9 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -35,7 +35,7 @@ const progressContextKey contextKey = "progress" // NewDAGService constructs a new DAGService (using the default implementation). // Note that the default implementation is also an ipld.LinkGetter. -func NewDAGService(bs bserv.BlockService) *dagService { +func NewDAGService(bs *bserv.BlockService) *dagService { return &dagService{ Blocks: bs, decoder: ipldLegacyDecoder, @@ -49,7 +49,7 @@ func NewDAGService(bs bserv.BlockService) *dagService { // // able to free some of them when vm pressure is high type dagService struct { - Blocks bserv.BlockService + Blocks *bserv.BlockService decoder *legacy.Decoder } @@ -162,7 +162,7 @@ func WrapSession(s *bserv.Session) format.NodeGetter { // Session returns a NodeGetter using a new session for block fetches. func (n *dagService) Session(ctx context.Context) format.NodeGetter { - session := bserv.NewSession(ctx, n.Blocks) + session := n.Blocks.NewSession(ctx) return &sesGetter{ bs: session, decoder: n.decoder, diff --git a/ipld/merkledag/test/utils.go b/ipld/merkledag/test/utils.go index 302d194d6..855196af1 100644 --- a/ipld/merkledag/test/utils.go +++ b/ipld/merkledag/test/utils.go @@ -17,7 +17,7 @@ func Mock() ipld.DAGService { } // Bserv returns a new, thread-safe, mock BlockService. -func Bserv() bsrv.BlockService { +func Bserv() *bsrv.BlockService { bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) return bsrv.New(bstore, offline.Exchange(bstore)) }