From cb37d8d57a68ff58284bd2489a8107d4e5d0e802 Mon Sep 17 00:00:00 2001 From: Mikhail Shogin Date: Sun, 20 Dec 2020 00:16:21 +0100 Subject: [PATCH] Add datacollector reload --- build/debian/postinst | 4 +-- cmd/datacollector/main.go | 29 ++++++++++++++--- cmd/randomtrader/main.go | 2 +- go.mod | 1 + go.sum | 2 -- pkg/config/config.go | 17 ++++++---- pkg/config/config_test.go | 6 ++-- pkg/datacollector/datacollector.go | 32 +++++++++++++++--- pkg/datacollector/datacollector_test.go | 43 +++++++++++++++++++++---- pkg/exchange/order.go | 11 +++++-- pkg/exchange/order_test.go | 20 ++++++++++++ pkg/logger/log.go | 11 +++++-- pkg/storage/gce.go | 17 ++++++++-- pkg/storage/storage_test.go | 5 ++- pkg/storage/test_gce.go | 4 +-- pkg/strategy/strategy.go | 4 +++ pkg/utils/time.go | 7 ++++ 17 files changed, 173 insertions(+), 42 deletions(-) create mode 100644 pkg/utils/time.go diff --git a/build/debian/postinst b/build/debian/postinst index e0ee214..fdf6993 100755 --- a/build/debian/postinst +++ b/build/debian/postinst @@ -2,8 +2,8 @@ adduser --system --group --no-create-home randomtrader systemctl daemon-reload -systemctl restart randomtrader -systemctl restart randomtrader-datacollector +# systemctl restart randomtrader +# systemctl restart randomtrader-datacollector mkdir -p /var/log/randomtrader chown -R randomtrader:randomtrader /var/log/randomtrader diff --git a/cmd/datacollector/main.go b/cmd/datacollector/main.go index 1ba283c..2cb0e35 100644 --- a/cmd/datacollector/main.go +++ b/cmd/datacollector/main.go @@ -24,18 +24,23 @@ func main() { } flag.Parse() - if err := config.Init(*configPath); err != nil { + if _, err := config.Init(*configPath); err != nil { logger.Errorf("can't initialise configuration: %s", err) os.Exit(1) } - cancel, err := datacollector.Start() - if err != nil { + if err := datacollector.Start(); err != nil { logger.Errorf("cannot run datacollector") - cancel() + datacollector.Stop() os.Exit(1) } + go processReload(*configPath) + + Run() +} + +func Run() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) @@ -43,6 +48,20 @@ func main() { <-c logger.Infof("shutting down data collector...") - cancel() + datacollector.Stop() logger.Infof("data collector has been stopped") } + +// processReload ... +func processReload(configPath string) { + for { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + <-c + if err := datacollector.Reload(configPath); err != nil { + logger.Fatalf("cannot reload datacollector: %w", err) + } else { + logger.Infof("datacollector reloaded successfully") + } + } +} diff --git a/cmd/randomtrader/main.go b/cmd/randomtrader/main.go index 91ca130..766bf37 100644 --- a/cmd/randomtrader/main.go +++ b/cmd/randomtrader/main.go @@ -24,7 +24,7 @@ func main() { } flag.Parse() - if err := config.Init(*configPath); err != nil { + if _, err := config.Init(*configPath); err != nil { logger.Errorf("can't initialise configuration: %s", err) os.Exit(1) } diff --git a/go.mod b/go.mod index 4765414..43df28a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( cloud.google.com/go/storage v1.10.0 + github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible diff --git a/go.sum b/go.sum index 86ec737..3828243 100644 --- a/go.sum +++ b/go.sum @@ -603,8 +603,6 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200806022845-90696ccdc692/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d h1:W07d4xkoAUSNOkOzdzXCdFGxT7o2rW4q8M34tB2i//k= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= -golang.org/x/tools v0.0.0-20201208233053-a543418bbed2 h1:vEtypaVub6UvKkiXZ2xx9QIvp9TL7sI7xp7vdi2kezA= -golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201210164618-f31efc5a5c28 h1:VgBpncx/x8r0uyVJrAj/3+59Sty1KTH5bBAyeIMRPEA= golang.org/x/tools v0.0.0-20201210164618-f31efc5a5c28/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/config/config.go b/pkg/config/config.go index 823310f..44a12a7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -70,30 +70,35 @@ type DataCollector struct { } // Init ... -func Init(configPath string) error { +func Init(configPath string) (Configuration, error) { + configSync.Lock() + defer configSync.Unlock() + setDefaults() file, err := os.Open(configPath) if err != nil { - return fmt.Errorf("can't open configuration file %q: %s", configPath, err) + return config, fmt.Errorf("can't open configuration file %q: %s", configPath, err) } defer file.Close() decoder := json.NewDecoder(file) var c Configuration if err := decoder.Decode(&c); err != nil { - return fmt.Errorf("can't parse configuration file %q: %s", configPath, err) + return config, fmt.Errorf("can't parse configuration file %q: %s", configPath, err) } - SwapConfig(c) - - return nil + return swapConfig(c), nil } // SwapConfig ... func SwapConfig(c Configuration) Configuration { configSync.Lock() defer configSync.Unlock() + return swapConfig(c) +} + +func swapConfig(c Configuration) Configuration { oldConfig := config config = c return oldConfig diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index adb6da0..3e69967 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -50,7 +50,8 @@ func (s *ConfigTestSuite) TestConfigFile() { config = configOrig }() - s.NoError(Init(f.Name())) + _, err = Init(f.Name()) + s.NoError(err) s.Equal(time.Duration(1)*time.Second, GetEventsRaiseInterval()) s.Equal("BTC-USD", GetCurrencyPair()) @@ -80,7 +81,8 @@ func (s *ConfigTestSuite) TestDefaultDataCollector() { config = configOrig }() - s.NoError(Init(f.Name())) + _, err = Init(f.Name()) + s.NoError(err) dc := GetDataCollector() s.NotNil(dc) diff --git a/pkg/datacollector/datacollector.go b/pkg/datacollector/datacollector.go index 917be4c..8abcddf 100644 --- a/pkg/datacollector/datacollector.go +++ b/pkg/datacollector/datacollector.go @@ -10,16 +10,40 @@ import ( "github.com/mshogin/randomtrader/pkg/logger" ) +var shutdown context.CancelFunc + // Start ... -func Start() (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) +var Start = func() error { + var ctx context.Context + ctx, shutdown = context.WithCancel(context.Background()) c := config.GetDataCollector() if err := startOrderBookDumper(ctx, c.OrderBook); err != nil { - return cancel, fmt.Errorf("cannot start order book collector: %w", err) + return fmt.Errorf("cannot start order book collector: %w", err) + } + + return nil +} + +// Stop ... +var Stop = func() { + shutdown() +} + +// Reload ... +func Reload(configPath string) error { + oldConfig, err := config.Init(configPath) + if err != nil { + config.SwapConfig(oldConfig) + return fmt.Errorf("cannot reload config: %w", err) } - return cancel, nil + Stop() + if err := Start(); err != nil { + return fmt.Errorf("cannot start datacollector: %w", err) + } + + return nil } func startOrderBookDumper(ctx context.Context, logConfigs []config.OrderBookLog) error { diff --git a/pkg/datacollector/datacollector_test.go b/pkg/datacollector/datacollector_test.go index 67e20fa..6d8cd05 100644 --- a/pkg/datacollector/datacollector_test.go +++ b/pkg/datacollector/datacollector_test.go @@ -40,16 +40,14 @@ func TestOrderBookCollector(t *testing.T) { exchange.SetupTestGRPCClient() - GetGCEClientOrig := storage.GetGCEClient - defer func() { storage.GetGCEClient = GetGCEClientOrig }() - storage.GetGCEClient = storage.GetGCETestClient + GetGCEClientOrig := storage.SwapGCEClient(storage.GetGCETestClient()) + defer storage.SwapGCEClient(GetGCEClientOrig) - cancelDataCollector, err := Start() - s.NoError(err) - defer cancelDataCollector() + s.NoError(Start()) + defer Stop() time.Sleep(3 * time.Second) // give collector the time to collect at least once - cancelDataCollector() + Stop() fpath := path.Join(tmpDir, logFilename) file, err := os.Open(fpath) @@ -67,3 +65,34 @@ func TestOrderBookCollector(t *testing.T) { s.NoError(sc.Err()) s.Greater(lineNo, 0) } + +func TestReload(t *testing.T) { + s := assert.New(t) + startOrig := Start + stopOrig := Stop + defer func() { + Start = startOrig + Stop = stopOrig + }() + + startCount, stopCount := 0, 0 + Start = func() error { + startCount++ + return nil + } + Stop = func() { + stopCount++ + } + + f, err := ioutil.TempFile("", "") + s.NoError(err) + s.NoError(f.Close()) + + s.NoError(ioutil.WriteFile(f.Name(), []byte("{}"), os.FileMode(644))) + + Reload(f.Name()) + defer Stop() + + s.Equal(1, startCount) + s.Equal(1, stopCount) +} diff --git a/pkg/exchange/order.go b/pkg/exchange/order.go index b3c7e00..cafed8d 100644 --- a/pkg/exchange/order.go +++ b/pkg/exchange/order.go @@ -3,9 +3,11 @@ package exchange import ( "context" "fmt" + "time" "github.com/mshogin/randomtrader/pkg/bidcontext" "github.com/mshogin/randomtrader/pkg/config" + "github.com/mshogin/randomtrader/pkg/utils" "github.com/thrasher-corp/gocryptotrader/gctrpc" ) @@ -52,8 +54,9 @@ type ( // OrderBook ... OrderBook struct { - Asks []orderBookItem - Bids []orderBookItem + Asks []orderBookItem + Bids []orderBookItem + DateTime time.Time } ) @@ -81,7 +84,9 @@ func GetOrderBook() (*OrderBook, error) { return nil, fmt.Errorf("cannot place order: %s", err) } - ob := OrderBook{} + ob := OrderBook{ + DateTime: utils.GetCurrentTime(), + } for _, ask := range result.GetAsks() { ob.Asks = append(ob.Asks, orderBookItem{Amount: ask.Amount, Price: ask.Price}) diff --git a/pkg/exchange/order_test.go b/pkg/exchange/order_test.go index c2d44b5..f9fae28 100644 --- a/pkg/exchange/order_test.go +++ b/pkg/exchange/order_test.go @@ -2,9 +2,11 @@ package exchange import ( "testing" + "time" "github.com/mshogin/randomtrader/pkg/bidcontext" "github.com/mshogin/randomtrader/pkg/config" + "github.com/mshogin/randomtrader/pkg/utils" "github.com/stretchr/testify/assert" ) @@ -19,3 +21,21 @@ func TestExecuteContext(t *testing.T) { s.NotEmpty(ctx.OrderID) } } + +func TestGetOrderBook(t *testing.T) { + s := assert.New(t) + SetupTestGRPCClient() + + GetCurrentTimeOrig := utils.GetCurrentTime + defer func() { utils.GetCurrentTime = GetCurrentTimeOrig }() + currentTime := time.Now() + utils.GetCurrentTime = func() time.Time { + return currentTime + } + + ob, err := GetOrderBook() + s.NoError(err) + s.NotNil(ob) + + s.Equal(currentTime, ob.DateTime) +} diff --git a/pkg/logger/log.go b/pkg/logger/log.go index 8a43e09..e347d6d 100644 --- a/pkg/logger/log.go +++ b/pkg/logger/log.go @@ -36,17 +36,22 @@ func Debugf(format string, args ...interface{}) { } else { s = "" } - fmt.Printf("DEBUG: "+s+" "+format+"\n", args...) // output for debug + fmt.Printf("DEBUG: "+s+" "+format+"\n", args...) } // Errorf ... func Errorf(format string, args ...interface{}) { - fmt.Printf("ERROR: "+format+"\n", args...) // output for debug + fmt.Printf("ERROR: "+format+"\n", args...) +} + +// Fatalf ... +func Fatalf(format string, args ...interface{}) { + panic(fmt.Errorf("FATAL: "+format+"\n", args...)) } // Infof ... func Infof(format string, args ...interface{}) { - fmt.Printf("INFO: "+format+"\n", args...) // output for debug + fmt.Printf("INFO: "+format+"\n", args...) } // ProcessContext ... diff --git a/pkg/storage/gce.go b/pkg/storage/gce.go index 2db6e56..8b83670 100644 --- a/pkg/storage/gce.go +++ b/pkg/storage/gce.go @@ -6,6 +6,7 @@ import ( "io" "os" "path" + "sync" "cloud.google.com/go/storage" "github.com/mshogin/randomtrader/pkg/config" @@ -16,10 +17,22 @@ type gceClientImpl struct { cli *storage.Client } -var gceClient *gceClientImpl +var gceClient Storage +var gceClientSync sync.Mutex + +func SwapGCEClient(newClient Storage) Storage { + gceClientSync.Lock() + defer gceClientSync.Unlock() + gceClientPrev := gceClient + gceClient = newClient + return gceClientPrev +} // GetGCEClient ... -var GetGCEClient = func() (Storage, error) { +func GetGCEClient() (Storage, error) { + gceClientSync.Lock() + defer gceClientSync.Unlock() + if gceClient != nil { return gceClient, nil } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 0897a32..17fc0a6 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -9,9 +9,8 @@ import ( func TestSaveOrderBookLog(t *testing.T) { s := assert.New(t) - GetGCEClientOrig := GetGCEClient - defer func() { GetGCEClient = GetGCEClientOrig }() - GetGCEClient = GetGCETestClient + gceClientOrig := SwapGCEClient(GetGCETestClient()) + defer func() { SwapGCEClient(gceClientOrig) }() s.NoError(SaveOrderBookLog("/some/path")) } diff --git a/pkg/storage/test_gce.go b/pkg/storage/test_gce.go index 9b9c166..e345deb 100644 --- a/pkg/storage/test_gce.go +++ b/pkg/storage/test_gce.go @@ -3,8 +3,8 @@ package storage type testGCEClient struct{} // GetGCETestClient ... -var GetGCETestClient = func() (Storage, error) { - return &testGCEClient{}, nil +func GetGCETestClient() Storage { + return &testGCEClient{} } // SaveObject ... diff --git a/pkg/strategy/strategy.go b/pkg/strategy/strategy.go index 5d46000..6ea234b 100644 --- a/pkg/strategy/strategy.go +++ b/pkg/strategy/strategy.go @@ -11,6 +11,10 @@ func ProcessContext(ctx *bidcontext.BidContext) error { return nil } + // if err := lisa.ProcessContext(ctx); err != nil { + // logger.Errorf("strategy lisa finished with error: %w", err) + // } + if len(ctx.Strategy) == 0 { return random.ProcessContext(ctx) } diff --git a/pkg/utils/time.go b/pkg/utils/time.go new file mode 100644 index 0000000..71e27dc --- /dev/null +++ b/pkg/utils/time.go @@ -0,0 +1,7 @@ +package utils + +import "time" + +var GetCurrentTime = func() time.Time { + return time.Now() +}