Skip to content

Commit

Permalink
Better control excessive traffic to Dgraph (#2678)
Browse files Browse the repository at this point in the history
- Proposal retries generate a lot of traffic for an already jammed system. Now we consider each proposal retry to have an exponentially increasing weight (2^retry), which is limited against `pending_proposal` flag.
- For all the retries, the pending proposals rate limiter bleeds the count back slowly over time, to help with the congestion.
- Remove `btree` concept, which was the cause of deadlocks during LRU.gets. The reason it was put in place was because Dgraph used to not write commits to disk earlier (keeping them in memory). That has changed now, with each commit going to disk before a read is done. So, we don't need to maintain this data structure. This speeds up mutations considerably.

Note that the lack of `btree` also means that a txn won't be able to read back its own uncommitted writes to secondary indices. I think that's a rare use case and hence a fair tradeoff, given the complexity and performance cost of having to overlay this structure on the DB.

* Working towards speeding up live loader.
* Wait before re-proposing, so we don't create too much work.
* Make rollup happen only once every 5m.
* Slowly bleed back to the proposal limit.
* Remove btree, gain performance.
* Works nicely with a lot of traffic now.
* Make tests work in query pacakge by ensuring that our commits are written to disk.
* Fix systest, testtxn and other tests.
* Add an Ignore method to keep linter happy (not used in this PR).
  • Loading branch information
manishrjain authored Oct 19, 2018
1 parent ae72083 commit a2e8376
Show file tree
Hide file tree
Showing 20 changed files with 362 additions and 436 deletions.
9 changes: 4 additions & 5 deletions contrib/integration/increment/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func queryCounter(txn *dgo.Txn) (Counter, error) {
query := fmt.Sprintf("{ q(func: has(%s)) { uid, val: %s }}", *pred, *pred)
resp, err := txn.Query(ctx, query)
if err != nil {
return counter, err
return counter, fmt.Errorf("Query error: %v", err)
}
m := make(map[string][]Counter)
if err := json.Unmarshal(resp.Json, &m); err != nil {
Expand Down Expand Up @@ -92,13 +92,12 @@ func process(dg *dgo.Dgraph, readOnly bool) (Counter, error) {
}
mu.SetNquads = []byte(fmt.Sprintf(`<%s> <%s> "%d"^^<xs:int> .`, counter.Uid, *pred, counter.Val))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = txn.Mutate(ctx, &mu)
// Don't put any timeout for mutation.
_, err = txn.Mutate(context.Background(), &mu)
if err != nil {
return Counter{}, err
}
return counter, txn.Commit(ctx)
return counter, txn.Commit(context.Background())
}

func main() {
Expand Down
10 changes: 7 additions & 3 deletions contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgo/x"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -439,8 +439,10 @@ func TestReadIndexKeySameTxn(t *testing.T) {

txn := s.dg.NewTxn()

mu := &api.Mutation{}
mu.SetJson = []byte(`{"name": "Manish"}`)
mu := &api.Mutation{
CommitNow: true,
SetJson: []byte(`{"name": "Manish"}`),
}
assigned, err := txn.Mutate(context.Background(), mu)
if err != nil {
log.Fatalf("Error while running mutation: %v\n", err)
Expand All @@ -453,6 +455,8 @@ func TestReadIndexKeySameTxn(t *testing.T) {
uid = u
}

txn = s.dg.NewTxn()
defer txn.Discard(context.Background())
q := `{ me(func: le(name, "Manish")) { uid }}`
resp, err := txn.Query(context.Background(), q)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ they form a Raft group and provide synchronous replication.
"actions (i.e., --whitelist 127.0.0.1:127.0.0.3,0.0.0.7:0.0.0.9)")

flag.StringVar(&worker.Config.ExportPath, "export", "export", "Folder in which to store exports.")
flag.IntVar(&worker.Config.NumPendingProposals, "pending_proposals", 2000,
flag.IntVar(&worker.Config.NumPendingProposals, "pending_proposals", 256,
"Number of pending mutation proposals. Useful for rate limiting.")
flag.Float64Var(&worker.Config.Tracing, "trace", 0.0, "The ratio of queries to trace.")
flag.StringVar(&worker.Config.MyAddr, "my", "",
Expand Down
11 changes: 8 additions & 3 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -139,6 +140,10 @@ func handleError(err error) {
x.Fatalf(s.Message())
case strings.Contains(s.Message(), "x509"):
x.Fatalf(s.Message())
case strings.Contains(s.Message(), "Server unavailable."):
dur := time.Duration(1+rand.Intn(10)) * time.Minute
x.Printf("Server is unavailable. Will retry after %s.", dur.Round(time.Minute))
time.Sleep(dur)
case err != y.ErrAborted && err != y.ErrConflict:
x.Printf("Error while mutating %v\n", s.Message())
}
Expand Down Expand Up @@ -197,9 +202,9 @@ func (l *loader) printCounters() {
for range l.ticker.C {
counter := l.Counter()
rate := float64(counter.Rdfs) / counter.Elapsed.Seconds()
elapsed := ((time.Since(start) / time.Second) * time.Second).String()
fmt.Printf("Total Txns done: %8d RDFs per second: %7.0f Time Elapsed: %v, Aborts: %d\n",
counter.TxnsDone, rate, elapsed, counter.Aborts)
elapsed := time.Since(start).Round(time.Second)
fmt.Printf("[%6s] Txns: %d RDFs: %d RDFs/sec: %5.0f Aborts: %d\n",
elapsed, counter.TxnsDone, counter.Rdfs, rate, counter.Aborts)

}
}
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func init() {
flag.StringP("schema", "s", "", "Location of schema file")
flag.StringP("dgraph", "d", "127.0.0.1:9080", "Dgraph gRPC server address")
flag.StringP("zero", "z", "127.0.0.1:5080", "Dgraphzero gRPC server address")
flag.IntP("conc", "c", 100,
flag.IntP("conc", "c", 10,
"Number of concurrent requests to make to Dgraph")
flag.IntP("batch", "b", 1000,
"Number of RDF N-Quads to send as part of a mutation.")
Expand Down
1 change: 0 additions & 1 deletion posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,6 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
}

func DeleteAll() error {
btree.DeleteAll()
lcache.clear(func([]byte) bool { return true })
return deleteEntries(nil, func(key []byte) bool {
pk := x.Parse(key)
Expand Down
1 change: 0 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ type List struct {
deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation.
estimatedSize int32
numCommits int
onDisk bool
}

// calculateSize would give you the size estimate. This is expensive, so run it carefully.
Expand Down
83 changes: 1 addition & 82 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package posting

import (
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"math"
Expand All @@ -34,7 +33,6 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/y"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -211,96 +209,19 @@ func updateMemoryMetrics(lc *y.Closer) {
var (
pstore *badger.DB
lcache *listCache
btree *BTree
closer *y.Closer
)

// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB) {
pstore = ps
lcache = newListCache(math.MaxUint64)
btree = newBTree(2)
x.LcacheCapacity.Set(math.MaxInt64)

closer = y.NewCloser(3)
closer = y.NewCloser(2)

go periodicUpdateStats(closer)
go updateMemoryMetrics(closer)
go clearBtree(closer)
}

// clearBtree checks if the keys stored in the btree have reached their conclusion, and if so,
// removes them from the tree. Conclusion in this case would be that the key got written out to
// disk, or the txn which introduced the key got aborted.
func clearBtree(closer *y.Closer) {
defer closer.Done()
var removeKey = errors.New("Remove key from btree.")

handleKey := func(txn *badger.Txn, k []byte) error {
_, err := txn.Get(k)
switch {
case err == badger.ErrKeyNotFound:
l := GetLru(k) // Retrieve from LRU cache, if it exists.
if l == nil {
// Posting list no longer in memory. So, it must have been either written to
// disk, or removed from memory after a txn abort.
return removeKey
}
l.RLock()
defer l.RUnlock()
if !l.hasPendingTxn() {
// This key's txn was aborted. So, we can remove it from btree.
return removeKey
}
return nil
case err != nil:
glog.Warningf("Error while checking key: %v\n", err)
return err
default:
// Key was found on disk. Remove from btree.
return removeKey
}
}

removeKeysOnDisk := func() {
var keys []string
var count int
err := pstore.View(func(txn *badger.Txn) error {
var rerr error
btree.Ascend(func(k []byte) bool {
count++
err := handleKey(txn, k)
if err == removeKey {
keys = append(keys, string(k))
} else if err != nil {
rerr = err
return false
}
return true
})
return rerr
})
if glog.V(2) && count > 0 {
glog.Infof("Btree size: %d. Removing=%d. Error=%v\n", count, len(keys), err)
}
if err == nil {
for _, k := range keys {
btree.Delete([]byte(k))
}
}
}

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
removeKeysOnDisk()
case <-closer.HasBeenClosed():
return
}
}
}

func Cleanup() {
Expand Down Expand Up @@ -339,8 +260,6 @@ func Get(key []byte) (rlist *List, err error) {
lp = lcache.PutIfMissing(string(key), l)
if lp != l {
x.CacheRace.Add(1)
} else if !lp.onDisk {
btree.Insert(lp.key)
}
return lp, nil
}
Expand Down
136 changes: 0 additions & 136 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,150 +255,14 @@ func getNew(key []byte, pstore *badger.DB) (*List, error) {
it.Seek(key)
l, err = ReadPostingList(key, it)
}

if err != nil {
return l, err
}

l.onDisk = true
l.Lock()
size := l.calculateSize()
l.Unlock()
x.BytesRead.Add(int64(size))
atomic.StoreInt32(&l.estimatedSize, size)
return l, nil
}

type BTreeIterator struct {
keys [][]byte
idx int
Reverse bool
Prefix []byte
}

func (bi *BTreeIterator) Next() {
bi.idx++
}

func (bi *BTreeIterator) Key() []byte {
x.AssertTrue(bi.Valid())
return bi.keys[bi.idx]
}

func (bi *BTreeIterator) Valid() bool {
// No need to check HasPrefix here, because we are only picking those keys
// which have the right prefix in Seek.
return bi.idx < len(bi.keys)
}

func (bi *BTreeIterator) Seek(key []byte) {
bi.keys = bi.keys[:0]
bi.idx = 0
cont := func(key []byte) bool {
if !bytes.HasPrefix(key, bi.Prefix) {
return false
}
bi.keys = append(bi.keys, key)
return true
}
if !bi.Reverse {
btree.AscendGreaterOrEqual(key, cont)
} else {
btree.DescendLessOrEqual(key, cont)
}
}

type TxnPrefixIterator struct {
btreeIter *BTreeIterator
badgerIter *badger.Iterator
prefix []byte
reverse bool
curKey []byte
userMeta byte // userMeta stored as part of badger item, used to skip empty PL in has query.
}

func NewTxnPrefixIterator(txn *badger.Txn,
iterOpts badger.IteratorOptions, prefix, key []byte) *TxnPrefixIterator {
x.AssertTrue(iterOpts.PrefetchValues == false)
txnIt := new(TxnPrefixIterator)
txnIt.reverse = iterOpts.Reverse
txnIt.prefix = prefix
txnIt.btreeIter = &BTreeIterator{
Reverse: iterOpts.Reverse,
Prefix: prefix,
}
txnIt.btreeIter.Seek(key)
// Create iterator only after copying the keys from btree, or else there could
// be race after creating iterator and before reading btree. Some keys might end up
// getting deleted and iterator won't be initialized with new memtbales.
txnIt.badgerIter = txn.NewIterator(iterOpts)
txnIt.badgerIter.Seek(key)
txnIt.Next()
return txnIt
}

func (t *TxnPrefixIterator) Valid() bool {
return len(t.curKey) > 0
}

func (t *TxnPrefixIterator) compare(key1 []byte, key2 []byte) int {
if !t.reverse {
return bytes.Compare(key1, key2)
}
return bytes.Compare(key2, key1)
}

func (t *TxnPrefixIterator) Next() {
if len(t.curKey) > 0 {
// Avoid duplicate keys during merging.
for t.btreeIter.Valid() && t.compare(t.btreeIter.Key(), t.curKey) <= 0 {
t.btreeIter.Next()
}
for t.badgerIter.ValidForPrefix(t.prefix) &&
t.compare(t.badgerIter.Item().Key(), t.curKey) <= 0 {
t.badgerIter.Next()
}
}

t.userMeta = 0 // reset it.
if !t.btreeIter.Valid() && !t.badgerIter.ValidForPrefix(t.prefix) {
t.curKey = nil
return
} else if !t.badgerIter.ValidForPrefix(t.prefix) {
t.storeKey(t.btreeIter.Key())
t.btreeIter.Next()
} else if !t.btreeIter.Valid() {
t.userMeta = t.badgerIter.Item().UserMeta()
t.storeKey(t.badgerIter.Item().Key())
t.badgerIter.Next()
} else { // Both are valid
if t.compare(t.btreeIter.Key(), t.badgerIter.Item().Key()) < 0 {
t.storeKey(t.btreeIter.Key())
t.btreeIter.Next()
} else {
t.userMeta = t.badgerIter.Item().UserMeta()
t.storeKey(t.badgerIter.Item().Key())
t.badgerIter.Next()
}
}
}

func (t *TxnPrefixIterator) UserMeta() byte {
return t.userMeta
}

func (t *TxnPrefixIterator) storeKey(key []byte) {
if cap(t.curKey) < len(key) {
t.curKey = make([]byte, 2*len(key))
}
t.curKey = t.curKey[:len(key)]
copy(t.curKey, key)
}

func (t *TxnPrefixIterator) Key() []byte {
return t.curKey
}

func (t *TxnPrefixIterator) Close() {
t.badgerIter.Close()
}
Loading

0 comments on commit a2e8376

Please sign in to comment.