Skip to content

Commit

Permalink
Add datacollector reload
Browse files Browse the repository at this point in the history
  • Loading branch information
mshogin committed Dec 19, 2020
1 parent a27e3a3 commit cb37d8d
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 42 deletions.
4 changes: 2 additions & 2 deletions build/debian/postinst
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

adduser --system --group --no-create-home randomtrader
systemctl daemon-reload
systemctl restart randomtrader
systemctl restart randomtrader-datacollector
# systemctl restart randomtrader
# systemctl restart randomtrader-datacollector

mkdir -p /var/log/randomtrader
chown -R randomtrader:randomtrader /var/log/randomtrader
29 changes: 24 additions & 5 deletions cmd/datacollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,44 @@ func main() {
}
flag.Parse()

if err := config.Init(*configPath); err != nil {
if _, err := config.Init(*configPath); err != nil {
logger.Errorf("can't initialise configuration: %s", err)
os.Exit(1)
}

cancel, err := datacollector.Start()
if err != nil {
if err := datacollector.Start(); err != nil {
logger.Errorf("cannot run datacollector")
cancel()
datacollector.Stop()
os.Exit(1)
}

go processReload(*configPath)

Run()
}

func Run() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

logger.Infof("data collector has been started")
<-c

logger.Infof("shutting down data collector...")
cancel()
datacollector.Stop()
logger.Infof("data collector has been stopped")
}

// processReload ...
func processReload(configPath string) {
for {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP)
<-c
if err := datacollector.Reload(configPath); err != nil {
logger.Fatalf("cannot reload datacollector: %w", err)
} else {
logger.Infof("datacollector reloaded successfully")
}
}
}
2 changes: 1 addition & 1 deletion cmd/randomtrader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func main() {
}
flag.Parse()

if err := config.Init(*configPath); err != nil {
if _, err := config.Init(*configPath); err != nil {
logger.Errorf("can't initialise configuration: %s", err)
os.Exit(1)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
cloud.google.com/go/storage v1.10.0
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,6 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200806022845-90696ccdc692/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d h1:W07d4xkoAUSNOkOzdzXCdFGxT7o2rW4q8M34tB2i//k=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2 h1:vEtypaVub6UvKkiXZ2xx9QIvp9TL7sI7xp7vdi2kezA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201210164618-f31efc5a5c28 h1:VgBpncx/x8r0uyVJrAj/3+59Sty1KTH5bBAyeIMRPEA=
golang.org/x/tools v0.0.0-20201210164618-f31efc5a5c28/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
17 changes: 11 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,35 @@ type DataCollector struct {
}

// Init ...
func Init(configPath string) error {
func Init(configPath string) (Configuration, error) {
configSync.Lock()
defer configSync.Unlock()

setDefaults()

file, err := os.Open(configPath)
if err != nil {
return fmt.Errorf("can't open configuration file %q: %s", configPath, err)
return config, fmt.Errorf("can't open configuration file %q: %s", configPath, err)
}
defer file.Close()

decoder := json.NewDecoder(file)
var c Configuration
if err := decoder.Decode(&c); err != nil {
return fmt.Errorf("can't parse configuration file %q: %s", configPath, err)
return config, fmt.Errorf("can't parse configuration file %q: %s", configPath, err)
}

SwapConfig(c)

return nil
return swapConfig(c), nil
}

// SwapConfig ...
func SwapConfig(c Configuration) Configuration {
configSync.Lock()
defer configSync.Unlock()
return swapConfig(c)
}

func swapConfig(c Configuration) Configuration {
oldConfig := config
config = c
return oldConfig
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (s *ConfigTestSuite) TestConfigFile() {
config = configOrig
}()

s.NoError(Init(f.Name()))
_, err = Init(f.Name())
s.NoError(err)

s.Equal(time.Duration(1)*time.Second, GetEventsRaiseInterval())
s.Equal("BTC-USD", GetCurrencyPair())
Expand Down Expand Up @@ -80,7 +81,8 @@ func (s *ConfigTestSuite) TestDefaultDataCollector() {
config = configOrig
}()

s.NoError(Init(f.Name()))
_, err = Init(f.Name())
s.NoError(err)

dc := GetDataCollector()
s.NotNil(dc)
Expand Down
32 changes: 28 additions & 4 deletions pkg/datacollector/datacollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,40 @@ import (
"github.com/mshogin/randomtrader/pkg/logger"
)

var shutdown context.CancelFunc

// Start ...
func Start() (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
var Start = func() error {
var ctx context.Context
ctx, shutdown = context.WithCancel(context.Background())
c := config.GetDataCollector()

if err := startOrderBookDumper(ctx, c.OrderBook); err != nil {
return cancel, fmt.Errorf("cannot start order book collector: %w", err)
return fmt.Errorf("cannot start order book collector: %w", err)
}

return nil
}

// Stop ...
var Stop = func() {
shutdown()
}

// Reload ...
func Reload(configPath string) error {
oldConfig, err := config.Init(configPath)
if err != nil {
config.SwapConfig(oldConfig)
return fmt.Errorf("cannot reload config: %w", err)
}

return cancel, nil
Stop()
if err := Start(); err != nil {
return fmt.Errorf("cannot start datacollector: %w", err)
}

return nil
}

func startOrderBookDumper(ctx context.Context, logConfigs []config.OrderBookLog) error {
Expand Down
43 changes: 36 additions & 7 deletions pkg/datacollector/datacollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ func TestOrderBookCollector(t *testing.T) {

exchange.SetupTestGRPCClient()

GetGCEClientOrig := storage.GetGCEClient
defer func() { storage.GetGCEClient = GetGCEClientOrig }()
storage.GetGCEClient = storage.GetGCETestClient
GetGCEClientOrig := storage.SwapGCEClient(storage.GetGCETestClient())
defer storage.SwapGCEClient(GetGCEClientOrig)

cancelDataCollector, err := Start()
s.NoError(err)
defer cancelDataCollector()
s.NoError(Start())
defer Stop()

time.Sleep(3 * time.Second) // give collector the time to collect at least once
cancelDataCollector()
Stop()

fpath := path.Join(tmpDir, logFilename)
file, err := os.Open(fpath)
Expand All @@ -67,3 +65,34 @@ func TestOrderBookCollector(t *testing.T) {
s.NoError(sc.Err())
s.Greater(lineNo, 0)
}

func TestReload(t *testing.T) {
s := assert.New(t)
startOrig := Start
stopOrig := Stop
defer func() {
Start = startOrig
Stop = stopOrig
}()

startCount, stopCount := 0, 0
Start = func() error {
startCount++
return nil
}
Stop = func() {
stopCount++
}

f, err := ioutil.TempFile("", "")
s.NoError(err)
s.NoError(f.Close())

s.NoError(ioutil.WriteFile(f.Name(), []byte("{}"), os.FileMode(644)))

Reload(f.Name())
defer Stop()

s.Equal(1, startCount)
s.Equal(1, stopCount)
}
11 changes: 8 additions & 3 deletions pkg/exchange/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package exchange
import (
"context"
"fmt"
"time"

"github.com/mshogin/randomtrader/pkg/bidcontext"
"github.com/mshogin/randomtrader/pkg/config"
"github.com/mshogin/randomtrader/pkg/utils"
"github.com/thrasher-corp/gocryptotrader/gctrpc"
)

Expand Down Expand Up @@ -52,8 +54,9 @@ type (

// OrderBook ...
OrderBook struct {
Asks []orderBookItem
Bids []orderBookItem
Asks []orderBookItem
Bids []orderBookItem
DateTime time.Time
}
)

Expand Down Expand Up @@ -81,7 +84,9 @@ func GetOrderBook() (*OrderBook, error) {
return nil, fmt.Errorf("cannot place order: %s", err)
}

ob := OrderBook{}
ob := OrderBook{
DateTime: utils.GetCurrentTime(),
}
for _, ask := range result.GetAsks() {
ob.Asks = append(ob.Asks,
orderBookItem{Amount: ask.Amount, Price: ask.Price})
Expand Down
20 changes: 20 additions & 0 deletions pkg/exchange/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package exchange

import (
"testing"
"time"

"github.com/mshogin/randomtrader/pkg/bidcontext"
"github.com/mshogin/randomtrader/pkg/config"
"github.com/mshogin/randomtrader/pkg/utils"
"github.com/stretchr/testify/assert"
)

Expand All @@ -19,3 +21,21 @@ func TestExecuteContext(t *testing.T) {
s.NotEmpty(ctx.OrderID)
}
}

func TestGetOrderBook(t *testing.T) {
s := assert.New(t)
SetupTestGRPCClient()

GetCurrentTimeOrig := utils.GetCurrentTime
defer func() { utils.GetCurrentTime = GetCurrentTimeOrig }()
currentTime := time.Now()
utils.GetCurrentTime = func() time.Time {
return currentTime
}

ob, err := GetOrderBook()
s.NoError(err)
s.NotNil(ob)

s.Equal(currentTime, ob.DateTime)
}
11 changes: 8 additions & 3 deletions pkg/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ func Debugf(format string, args ...interface{}) {
} else {
s = ""
}
fmt.Printf("DEBUG: "+s+" "+format+"\n", args...) // output for debug
fmt.Printf("DEBUG: "+s+" "+format+"\n", args...)
}

// Errorf ...
func Errorf(format string, args ...interface{}) {
fmt.Printf("ERROR: "+format+"\n", args...) // output for debug
fmt.Printf("ERROR: "+format+"\n", args...)
}

// Fatalf ...
func Fatalf(format string, args ...interface{}) {
panic(fmt.Errorf("FATAL: "+format+"\n", args...))
}

// Infof ...
func Infof(format string, args ...interface{}) {
fmt.Printf("INFO: "+format+"\n", args...) // output for debug
fmt.Printf("INFO: "+format+"\n", args...)
}

// ProcessContext ...
Expand Down
17 changes: 15 additions & 2 deletions pkg/storage/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"path"
"sync"

"cloud.google.com/go/storage"
"github.com/mshogin/randomtrader/pkg/config"
Expand All @@ -16,10 +17,22 @@ type gceClientImpl struct {
cli *storage.Client
}

var gceClient *gceClientImpl
var gceClient Storage
var gceClientSync sync.Mutex

func SwapGCEClient(newClient Storage) Storage {
gceClientSync.Lock()
defer gceClientSync.Unlock()
gceClientPrev := gceClient
gceClient = newClient
return gceClientPrev
}

// GetGCEClient ...
var GetGCEClient = func() (Storage, error) {
func GetGCEClient() (Storage, error) {
gceClientSync.Lock()
defer gceClientSync.Unlock()

if gceClient != nil {
return gceClient, nil
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
func TestSaveOrderBookLog(t *testing.T) {
s := assert.New(t)

GetGCEClientOrig := GetGCEClient
defer func() { GetGCEClient = GetGCEClientOrig }()
GetGCEClient = GetGCETestClient
gceClientOrig := SwapGCEClient(GetGCETestClient())
defer func() { SwapGCEClient(gceClientOrig) }()

s.NoError(SaveOrderBookLog("/some/path"))
}
Loading

0 comments on commit cb37d8d

Please sign in to comment.