diff --git a/p2p/host/peerstore/pstoreds/addr_book.go b/p2p/host/peerstore/pstoreds/addr_book.go index e7c1e0aec8..55db09fd43 100644 --- a/p2p/host/peerstore/pstoreds/addr_book.go +++ b/p2p/host/peerstore/pstoreds/addr_book.go @@ -50,7 +50,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) { key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID))) if len(r.Addrs) == 0 { - if err = write.Delete(key); err == nil { + if err = write.Delete(context.TODO(), key); err == nil { r.dirty = false } return err @@ -60,7 +60,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) { if err != nil { return err } - if err = write.Put(key, data); err != nil { + if err = write.Put(context.TODO(), key, data); err != nil { return err } // write succeeded; record is no longer dirty. @@ -223,7 +223,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id))) - data, err := ab.ds.Get(key) + data, err := ab.ds.Get(context.TODO(), key) switch err { case ds.ErrNotFound: @@ -446,7 +446,7 @@ func (ab *dsAddrBook) ClearAddrs(p peer.ID) { ab.cache.Remove(p) key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p))) - if err := ab.ds.Delete(key); err != nil { + if err := ab.ds.Delete(context.TODO(), key); err != nil { log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err) } } diff --git a/p2p/host/peerstore/pstoreds/addr_book_gc.go b/p2p/host/peerstore/pstoreds/addr_book_gc.go index 808a5aa392..5cc0c3ad3d 100644 --- a/p2p/host/peerstore/pstoreds/addr_book_gc.go +++ b/p2p/host/peerstore/pstoreds/addr_book_gc.go @@ -152,7 +152,7 @@ func (gc *dsAddrBookGc) purgeLookahead() { if err != nil { log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err) } - if err = batch.Delete(key); err != nil { + if err = batch.Delete(context.TODO(), key); err != nil { log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err) } } @@ -160,20 +160,20 @@ func (gc *dsAddrBookGc) purgeLookahead() { // This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit // if the next earliest expiry falls within the current window again. dropOrReschedule := func(key ds.Key, ar *addrsRecord) { - if err := batch.Delete(key); err != nil { + if err := batch.Delete(context.TODO(), key); err != nil { log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err) } // re-add the record if it needs to be visited again in this window. if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd { gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name())) - if err := batch.Put(gcKey, []byte{}); err != nil { + if err := batch.Put(context.TODO(), gcKey, []byte{}); err != nil { log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err) } } } - results, err := gc.ab.ds.Query(purgeLookaheadQuery) + results, err := gc.ab.ds.Query(context.TODO(), purgeLookaheadQuery) if err != nil { log.Warnf("failed while fetching entries to purge: %v", err) return @@ -228,7 +228,7 @@ func (gc *dsAddrBookGc) purgeLookahead() { // otherwise, fetch it from the store, clean it and flush it. entryKey := addrBookBase.ChildString(gcKey.Name()) - val, err := gc.ab.ds.Get(entryKey) + val, err := gc.ab.ds.Get(context.TODO(), entryKey) if err != nil { // captures all errors, including ErrNotFound. dropInError(gcKey, err, "fetching entry") @@ -248,7 +248,7 @@ func (gc *dsAddrBookGc) purgeLookahead() { dropOrReschedule(gcKey, record) } - if err = batch.Commit(); err != nil { + if err = batch.Commit(context.TODO()); err != nil { log.Warnf("failed to commit GC purge batch: %v", err) } } @@ -268,7 +268,7 @@ func (gc *dsAddrBookGc) purgeStore() { log.Warnf("failed while creating batch to purge GC entries: %v", err) } - results, err := gc.ab.ds.Query(purgeStoreQuery) + results, err := gc.ab.ds.Query(context.TODO(), purgeStoreQuery) if err != nil { log.Warnf("failed while opening iterator: %v", err) return @@ -294,7 +294,7 @@ func (gc *dsAddrBookGc) purgeStore() { gc.ab.cache.Remove(id) } - if err = batch.Commit(); err != nil { + if err = batch.Commit(context.TODO()); err != nil { log.Warnf("failed to commit GC purge batch: %v", err) } } @@ -321,7 +321,7 @@ func (gc *dsAddrBookGc) populateLookahead() { var id peer.ID record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} - results, err := gc.ab.ds.Query(populateLookaheadQuery) + results, err := gc.ab.ds.Query(context.TODO(), populateLookaheadQuery) if err != nil { log.Warnf("failed while querying to populate lookahead GC window: %v", err) return @@ -354,7 +354,7 @@ func (gc *dsAddrBookGc) populateLookahead() { continue } gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32)) - if err = batch.Put(gcKey, []byte{}); err != nil { + if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil { log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err) } cached.RUnlock() @@ -363,7 +363,7 @@ func (gc *dsAddrBookGc) populateLookahead() { record.Reset() - val, err := gc.ab.ds.Get(ds.RawKey(result.Key)) + val, err := gc.ab.ds.Get(context.TODO(), ds.RawKey(result.Key)) if err != nil { log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err) continue @@ -374,13 +374,13 @@ func (gc *dsAddrBookGc) populateLookahead() { } if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until { gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32)) - if err = batch.Put(gcKey, []byte{}); err != nil { + if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil { log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err) } } } - if err = batch.Commit(); err != nil { + if err = batch.Commit(context.TODO()); err != nil { log.Warnf("failed to commit GC lookahead batch: %v", err) } diff --git a/p2p/host/peerstore/pstoreds/addr_book_gc_test.go b/p2p/host/peerstore/pstoreds/addr_book_gc_test.go index 82e5127c17..d6ecdb84f3 100644 --- a/p2p/host/peerstore/pstoreds/addr_book_gc_test.go +++ b/p2p/host/peerstore/pstoreds/addr_book_gc_test.go @@ -1,6 +1,7 @@ package pstoreds import ( + "context" "testing" "time" @@ -18,7 +19,7 @@ type testProbe struct { } func (tp *testProbe) countLookaheadEntries() (i int) { - results, err := tp.ab.(*dsAddrBook).ds.Query(lookaheadQuery) + results, err := tp.ab.(*dsAddrBook).ds.Query(context.Background(), lookaheadQuery) if err != nil { tp.t.Fatal(err) } diff --git a/p2p/host/peerstore/pstoreds/cyclic_batch.go b/p2p/host/peerstore/pstoreds/cyclic_batch.go index 513a91c9d7..51f6afb0d7 100644 --- a/p2p/host/peerstore/pstoreds/cyclic_batch.go +++ b/p2p/host/peerstore/pstoreds/cyclic_batch.go @@ -1,6 +1,7 @@ package pstoreds import ( + "context" "errors" "fmt" @@ -23,7 +24,7 @@ type cyclicBatch struct { } func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) { - batch, err := ds.Batch() + batch, err := ds.Batch(context.TODO()) if err != nil { return nil, err } @@ -39,36 +40,36 @@ func (cb *cyclicBatch) cycle() (err error) { return nil } // commit and renew the batch. - if err = cb.Batch.Commit(); err != nil { + if err = cb.Batch.Commit(context.TODO()); err != nil { return fmt.Errorf("failed while committing cyclic batch: %w", err) } - if cb.Batch, err = cb.ds.Batch(); err != nil { + if cb.Batch, err = cb.ds.Batch(context.TODO()); err != nil { return fmt.Errorf("failed while renewing cyclic batch: %w", err) } return nil } -func (cb *cyclicBatch) Put(key ds.Key, val []byte) error { +func (cb *cyclicBatch) Put(ctx context.Context, key ds.Key, val []byte) error { if err := cb.cycle(); err != nil { return err } cb.pending++ - return cb.Batch.Put(key, val) + return cb.Batch.Put(ctx, key, val) } -func (cb *cyclicBatch) Delete(key ds.Key) error { +func (cb *cyclicBatch) Delete(ctx context.Context, key ds.Key) error { if err := cb.cycle(); err != nil { return err } cb.pending++ - return cb.Batch.Delete(key) + return cb.Batch.Delete(ctx, key) } -func (cb *cyclicBatch) Commit() error { +func (cb *cyclicBatch) Commit(ctx context.Context) error { if cb.Batch == nil { return errors.New("cyclic batch is closed") } - if err := cb.Batch.Commit(); err != nil { + if err := cb.Batch.Commit(ctx); err != nil { return err } cb.pending = 0 diff --git a/p2p/host/peerstore/pstoreds/keybook.go b/p2p/host/peerstore/pstoreds/keybook.go index 0fd0bcadaa..892302cdcd 100644 --- a/p2p/host/peerstore/pstoreds/keybook.go +++ b/p2p/host/peerstore/pstoreds/keybook.go @@ -36,7 +36,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey { key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(pubSuffix) var pk ic.PubKey - if value, err := kb.ds.Get(key); err == nil { + if value, err := kb.ds.Get(context.TODO(), key); err == nil { pk, err = ic.UnmarshalPublicKey(value) if err != nil { log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err) @@ -56,7 +56,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey { log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err) return nil } - err = kb.ds.Put(key, pkb) + err = kb.ds.Put(context.TODO(), key, pkb) if err != nil { log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err) return nil @@ -80,7 +80,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error { log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err) return err } - err = kb.ds.Put(key, val) + err = kb.ds.Put(context.TODO(), key, val) if err != nil { log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err) } @@ -89,7 +89,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error { func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey { key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(privSuffix) - value, err := kb.ds.Get(key) + value, err := kb.ds.Get(context.TODO(), key) if err != nil { log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err) return nil @@ -116,7 +116,7 @@ func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error { log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err) return err } - err = kb.ds.Put(key, val) + err = kb.ds.Put(context.TODO(), key, val) if err != nil { log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err) } diff --git a/p2p/host/peerstore/pstoreds/metadata.go b/p2p/host/peerstore/pstoreds/metadata.go index bf7655231c..73646fdd80 100644 --- a/p2p/host/peerstore/pstoreds/metadata.go +++ b/p2p/host/peerstore/pstoreds/metadata.go @@ -45,7 +45,7 @@ func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) { return nil, err } k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key) - value, err := pm.ds.Get(k) + value, err := pm.ds.Get(context.TODO(), k) if err != nil { if err == ds.ErrNotFound { err = pstore.ErrNotFound @@ -69,5 +69,5 @@ func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error { if err := gob.NewEncoder(&buf).Encode(&val); err != nil { return err } - return pm.ds.Put(k, buf.Bytes()) + return pm.ds.Put(context.TODO(), k, buf.Bytes()) } diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index bea64dc6c9..1809b9fd30 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -95,7 +95,7 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R err error ) - if results, err = ds.Query(q); err != nil { + if results, err = ds.Query(context.TODO(), q); err != nil { log.Error(err) return nil, err }