Skip to content

Commit

Permalink
Merge pull request ethereum#33 from ethersphere/feature/ethutil-refactor
Browse files Browse the repository at this point in the history
ethreact - Feature/ethutil refactor
  • Loading branch information
obscuren committed Jul 7, 2014
2 parents b232acd + 5a2afc5 commit b958179
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 159 deletions.
5 changes: 3 additions & 2 deletions ethchain/dagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ethchain
import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3"
"hash"
Expand All @@ -14,15 +15,15 @@ import (
var powlogger = ethlog.NewLogger("POW")

type PoW interface {
Search(block *Block, reactChan chan ethutil.React) []byte
Search(block *Block, reactChan chan ethreact.Event) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool
}

type EasyPow struct {
hash *big.Int
}

func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte {
func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce()
diff := block.Difficulty
Expand Down
3 changes: 2 additions & 1 deletion ethchain/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethtrie"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
Expand Down Expand Up @@ -36,7 +37,7 @@ type EthManager interface {
BlockChain() *BlockChain
TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{})
Reactor() *ethutil.ReactorEngine
Reactor() *ethreact.ReactorEngine
PeerCount() int
IsMining() bool
IsListening() bool
Expand Down
11 changes: 8 additions & 3 deletions ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
Expand Down Expand Up @@ -71,7 +72,7 @@ type Ethereum struct {

listening bool

reactor *ethutil.ReactorEngine
reactor *ethreact.ReactorEngine

RpcServer *ethrpc.JsonRpcServer

Expand Down Expand Up @@ -106,7 +107,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
keyManager: keyManager,
clientIdentity: clientIdentity,
}
ethereum.reactor = ethutil.NewReactorEngine()
ethereum.reactor = ethreact.New()

ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
Expand All @@ -118,7 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil
}

func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor
}

Expand Down Expand Up @@ -350,6 +351,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {

// Start the ethereum
func (s *Ethereum) Start(seed bool) {
s.reactor.Start()
// Bind to addr and port
ln, err := net.Listen("tcp", ":"+s.Port)
if err != nil {
Expand Down Expand Up @@ -461,6 +463,9 @@ func (s *Ethereum) Stop() {
s.txPool.Stop()
s.stateManager.Stop()

s.reactor.Flush()
s.reactor.Stop()

ethlogger.Infoln("Server stopped")
close(s.shutdownChan)
}
Expand Down
57 changes: 39 additions & 18 deletions ethlog/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (msg *logMessage) send(logger LogSystem) {
var logMessages chan (*logMessage)
var logSystems []LogSystem
var quit chan bool
var drained chan bool
var shutdown chan bool
var mutex = sync.Mutex{}

type LogLevel uint8

Expand All @@ -57,29 +60,41 @@ func start() {
out:
for {
select {
case <-quit:
break out
case msg := <-logMessages:
for _, logSystem := range logSystems {
if logSystem.GetLogLevel() >= msg.LogLevel {
msg.send(logSystem)
}
}
case <-quit:
break out
case drained <- true:
default:
drained <- true // this blocks until a message is sent to the queu
}
}
close(shutdown)
}

// waits until log messages are drained (dispatched to log writers)
func Flush() {
quit <- true

done:
for {
func Reset() {
mutex.Lock()
defer mutex.Unlock()
if logSystems != nil {
quit <- true
select {
case <-logMessages:
default:
break done
case <-drained:
}
<-shutdown
}
logSystems = nil
}

// waits until log messages are drained (dispatched to log writers)
func Flush() {
mutex.Lock()
defer mutex.Unlock()
if logSystems != nil {
<-drained
}
}

Expand All @@ -92,28 +107,34 @@ func NewLogger(tag string) *Logger {
}

func AddLogSystem(logSystem LogSystem) {
var mutex = &sync.Mutex{}
mutex.Lock()
defer mutex.Unlock()
if logSystems == nil {
logMessages = make(chan *logMessage)
quit = make(chan bool)
drained = make(chan bool, 1)
shutdown = make(chan bool, 1)
go start()
}
logSystems = append(logSystems, logSystem)
}

func send(msg *logMessage) {
select {
case <-drained:
}
logMessages <- msg
}

func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
if logMessages != nil {
msg := newPrintlnLogMessage(level, logger.tag, v...)
logMessages <- msg
if logSystems != nil {
send(newPrintlnLogMessage(level, logger.tag, v...))
}
}

func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
if logMessages != nil {
msg := newPrintfLogMessage(level, logger.tag, format, v...)
logMessages <- msg
if logSystems != nil {
send(newPrintfLogMessage(level, logger.tag, format, v...))
}
}

Expand Down
23 changes: 11 additions & 12 deletions ethlog/loggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ func (t *TestLogSystem) GetLogLevel() LogLevel {
return t.level
}

func quote(s string) string {
return fmt.Sprintf("'%s'", s)
}

func TestLoggerPrintln(t *testing.T) {
logger := NewLogger("TEST")
testLogSystem := &TestLogSystem{level: WarnLevel}
Expand All @@ -41,10 +37,10 @@ func TestLoggerPrintln(t *testing.T) {
logger.Infoln("info")
logger.Debugln("debug")
Flush()
Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error\n[TEST] warn\n" {
t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output))
t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", testLogSystem.Output)
}
}

Expand All @@ -57,10 +53,10 @@ func TestLoggerPrintf(t *testing.T) {
logger.Infof("info")
logger.Debugf("debug")
Flush()
Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error to { 2}\n[TEST] warn" {
t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output))
t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", testLogSystem.Output)
}
}

Expand All @@ -73,13 +69,14 @@ func TestMultipleLogSystems(t *testing.T) {
logger.Errorln("error")
logger.Warnln("warn")
Flush()
Reset()
output0 := testLogSystem0.Output
output1 := testLogSystem1.Output
if output0 != "[TEST] error\n" {
t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output))
t.Error("Expected logger 0 output '[TEST] error\\n', got ", testLogSystem0.Output)
}
if output1 != "[TEST] error\n[TEST] warn\n" {
t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output))
t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", testLogSystem1.Output)
}
}

Expand All @@ -92,11 +89,11 @@ func TestFileLogSystem(t *testing.T) {
logger.Errorf("error to %s\n", filename)
logger.Warnln("warn")
Flush()
Reset()
contents, _ := ioutil.ReadFile(filename)
output := string(contents)
fmt.Println(quote(output))
if output != "[TEST] error to test.log\n[TEST] warn\n" {
t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output))
t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output)
} else {
os.Remove(filename)
}
Expand All @@ -105,5 +102,7 @@ func TestFileLogSystem(t *testing.T) {
func TestNoLogSystem(t *testing.T) {
logger := NewLogger("TEST")
logger.Warnln("warn")
fmt.Println("1")
Flush()
Reset()
}
12 changes: 6 additions & 6 deletions ethminer/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire"
"sort"
)
Expand All @@ -15,19 +15,19 @@ type Miner struct {
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
reactChan chan ethutil.React
reactChan chan ethreact.Event
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
powChan chan []byte
powQuitChan chan ethutil.React
powQuitChan chan ethreact.Event
quitChan chan bool
}

func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner {
reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread
reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
quitChan := make(chan bool, 1)

ethereum.Reactor().Subscribe("newBlock", reactChan)
Expand Down
40 changes: 40 additions & 0 deletions ethreact/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ethreact

ethereum event reactor. Component of the ethereum stack.
various events like state change on an account or new block found are broadcast to subscribers.
Broadcasting to subscribers is running on its own routine and globally order preserving.

## Clients
### subscribe

eventChannel := make(chan ethreact.Event)
reactor.Subscribe(event, eventChannel)

The same channel can be subscribed to multiple events but only once for each event. In order to allow order of events to be preserved, broadcast of events is synchronous within the main broadcast loop. Therefore any blocking subscriber channels will be skipped, i.e. missing broadcasting events while they are blocked.

### unsubscribe

reactor.Unsubscribe(event, eventChannel)

### Processing events

event.Resource is of type interface{}. The actual type of event.Resource depends on event.Name and may need to be cast for processing.

var event ethreact.Event
for {
select {
case event = <-eventChannel:
processTransaction(event.Resource.(Transaction))
}
}

## Broadcast

reactor := ethreact.New()
reactor.Start()
reactor.Post(name, resource)
reactor.Flush() // wait till all broadcast messages are dispatched
reactor.Stop() // stop the main broadcast loop immediately (even if there are unbroadcast events left)



Loading

0 comments on commit b958179

Please sign in to comment.