Skip to content

Commit

Permalink
Merge pull request ethereum#271 from nextyio/random
Browse files Browse the repository at this point in the history
VDF and delayer improvement
  • Loading branch information
Zergity authored Sep 8, 2019
2 parents 57ed173 + ba10817 commit ee5426c
Show file tree
Hide file tree
Showing 10 changed files with 1,131 additions and 58 deletions.
160 changes: 121 additions & 39 deletions core/vdf/delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ package vdf

import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"sync"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
Expand All @@ -29,84 +34,155 @@ import (
// The same task request will be ignored; a new task request will interrupt
// and replace the current one.
//
// bitSize value of 2^n-1 is recommened
// output size (in bytes) will be ((bitSize+16)>>4)*4
// bitSize(bit) will be outputSize(byte)<<2 - 1
//
// ORIGINAL:
// bitSize value of 2^n-1 is recommened
// output size (in bytes) will be ((bitSize+16)>>4)*4
type Delayer struct {
bitSize uint64
loopOnce sync.Once
requestCh chan task
responseCh chan []byte
bitSize uint64
loopOnce sync.Once
stopCh chan struct{} // to stop all running vdf routines
reqCh chan task // to request new delay task
resChCh chan (<-chan []byte) // to get the chan that will return the vdf output

outputCache *lru.ARCCache // task.GetKey() => []byte
}

// NewDelayer creates a new Delayer instance
func NewDelayer(bitSize uint64) *Delayer {
func NewDelayer(outputSize uint64) *Delayer {
outputCache, _ := lru.NewARC(8)
return &Delayer{
bitSize: bitSize,
requestCh: make(chan task),
responseCh: make(chan []byte),
bitSize: outputSize<<2 - 1,
stopCh: make(chan struct{}),
reqCh: make(chan task),
resChCh: make(chan (<-chan []byte)),

outputCache: outputCache,
}
}

// OutputSize returns size in bytes of the VDF output (y+proof)
func (d *Delayer) OutputSize() uint64 {
return ((d.bitSize + 16) >> 4) << 2
// Verify verifies the given output against the seed and iteration
func (d *Delayer) Verify(seed, output []byte, iteration uint64) bool {
t := task{
seed: seed,
iteration: iteration,
}
if cached, ok := d.outputCache.Get(t.GetKey()); ok {
return bytes.Equal(output, cached.([]byte))
}
return Instance().Verify(seed, output, iteration, d.bitSize)
}

// Request request new delay task.
// Get request new delay task and block for output.
//
// The same task request will be ignored; a new task request will interrupt
// and replace the current one.
func (d *Delayer) Request(seed []byte, iteration uint64) <-chan []byte {
//
// Requested routine MUST read exactly 1 value from returning chanel, nil value
// indicates that the request is replaced by other request.
func (d *Delayer) Get(seed []byte, iteration uint64) []byte {
t := task{
seed: seed,
iteration: iteration,
}
if output, ok := d.outputCache.Get(t.GetKey()); ok {
return output.([]byte)
}
d.loopOnce.Do(func() {
go d.loop()
})
d.requestCh <- task{
d.reqCh <- t
return <-<-d.resChCh
}

// Request requests new delay task and return.
// Return the cached output if available.
func (d *Delayer) Request(seed []byte, iteration uint64) []byte {
t := task{
seed: seed,
iteration: iteration,
}
return d.responseCh
if output, ok := d.outputCache.Get(t.GetKey()); ok {
return output.([]byte)
}
d.loopOnce.Do(func() {
go d.loop()
})
d.reqCh <- t
<-d.resChCh // discard the output chan
return nil
}

// Peek attempts to get the cached result, without requesting
// any delay task.
func (d *Delayer) Peek(seed []byte, iteration uint64) []byte {
t := task{
seed: seed,
iteration: iteration,
}
if output, ok := d.outputCache.Get(t.GetKey()); ok {
return output.([]byte)
}
return nil
}

// Stop stops the current delay task.
func (d *Delayer) Stop() {
d.requestCh <- task{}
// cancel all currently running routines
for {
select {
case d.stopCh <- struct{}{}:
default:
// no more routines to stop
return
}
}
}

func (d *Delayer) loop() {
var current task
stopCh := make(chan struct{})
var resCh chan []byte
for {
select {
case t := <-d.requestCh:
case t := <-d.reqCh:
if t.Equal(current) {
log.Trace("Delayer: discarding duplicated request", "seed", common.Bytes2Hex(t.seed), "iteration", t.iteration)
d.resChCh <- resCh
continue
}
if current.iteration > 0 {
// cancel the current task
select {
case stopCh <- struct{}{}:
default:
}
}
if t.iteration == 0 {
// don't start new worker routine
log.Error("Delayer: cancelation")
continue
}
d.Stop()
// new task
current = t
resCh = make(chan []byte)
d.resChCh <- resCh

// start new worker routine
go func(t task) {
output, err := Instance().Generate(t.seed, t.iteration, d.bitSize, stopCh)
go func(t task, resCh chan<- []byte) {
output, err := Instance().Generate(t.seed, t.iteration, d.bitSize, d.stopCh)
defer close(resCh)
if err != nil {
log.Error("Delayer: VDF worker loop failed", "err", err)
fmt.Printf("Delayer: VDF worker loop failed, err=%v\n", err)
return
}
select {
case d.responseCh <- output:
default:
log.Warn("Delayer: VDF result is not read by miner", "output", common.Bytes2Hex(output))
if len(output) == 0 {
log.Info("Delayer: interrupted")
return
}
// cache the result
d.outputCache.Add(t.GetKey(), output)
// broadcast to all listening chan
for {
select {
case resCh <- output:
default:
return // no more listener to broadcast
}
}
}(t)
}(t, resCh)
// give the forked worker a chance to start first
runtime.Gosched()
}
}
}
Expand All @@ -120,3 +196,9 @@ type task struct {
func (t task) Equal(u task) bool {
return t.iteration == u.iteration && bytes.Equal(t.seed, u.seed)
}

func (t task) GetKey() string {
buf := make([]byte, 8, 8)
binary.BigEndian.PutUint64(buf, t.iteration)
return string(append(buf, t.seed...))
}
Loading

0 comments on commit ee5426c

Please sign in to comment.