Skip to content

Commit

Permalink
Use server GracefulStop instead of force stop (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
zbud-msft authored Jul 2, 2024
1 parent 7631184 commit 7801415
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 11 deletions.
11 changes: 10 additions & 1 deletion gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,22 @@ func (srv *Server) Serve() error {
return srv.s.Serve(srv.lis)
}

func (srv *Server) ForceStop() {
s := srv.s
if s == nil {
log.Errorf("ForceStop() failed: not initialized")
return
}
s.Stop()
}

func (srv *Server) Stop() {
s := srv.s
if s == nil {
log.Errorf("Stop() failed: not initialized")
return
}
s.Stop()
s.GracefulStop()
}

// Address returns the port the Server is listening to.
Expand Down
26 changes: 19 additions & 7 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3419,7 +3419,7 @@ func TestClientConnections(t *testing.T) {
func TestConnectionDataSet(t *testing.T) {
s := createServer(t, 8081)
go runServer(t, s)
defer s.Stop()
defer s.ForceStop()

tests := []struct {
desc string
Expand Down Expand Up @@ -3491,16 +3491,16 @@ func TestConnectionsKeepAlive(t *testing.T) {
defer s.Stop()

tests := []struct {
desc string
q client.Query
want []client.Notification
poll int
desc string
q client.Query
want []client.Notification
poll int
}{
{
desc: "Testing KeepAlive with goroutine count",
poll: 3,
q: client.Query{
Target: "COUNTERS_DB",
Target: "COUNTERS_DB",
Type: client.Poll,
Queries: []client.Path{{"COUNTERS", "Ethernet*"}},
TLS: &tls.Config{InsecureSkipVerify: true},
Expand All @@ -3511,12 +3511,14 @@ func TestConnectionsKeepAlive(t *testing.T) {
},
},
}
for _, tt := range tests {
for _, tt := range(tests) {
var clients []*cacheclient.CacheClient
for i := 0; i < 5; i++ {
t.Run(tt.desc, func(t *testing.T) {
q := tt.q
q.Addrs = []string{"127.0.0.1:8081"}
c := client.New()
clients = append(clients, c)
wg := new(sync.WaitGroup)
wg.Add(1)

Expand All @@ -3538,6 +3540,9 @@ func TestConnectionsKeepAlive(t *testing.T) {
}
})
}
for _, cacheClient := range(clients) {
cacheClient.Close()
}
}
}

Expand Down Expand Up @@ -3941,6 +3946,13 @@ func TestNilServerStop(t *testing.T) {
s.Stop()
}

func TestNilServerForceStop(t *testing.T) {
// Create a server with nil grpc server, such that s.ForceStop is called with nil value
t.Log("Expecting s.ForceStop to log error as server is nil")
s := &Server{}
s.ForceStop()
}

func TestInvalidServer(t *testing.T) {
s := createInvalidServer(t, 9000)
if s != nil {
Expand Down
5 changes: 3 additions & 2 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func runTelemetry(args []string) error {
var serverControlSignal = make(chan ServerControlValue, 1)
var stopSignalHandler = make(chan bool, 1)
sigchannel := make(chan os.Signal, 1)
signal.Notify(sigchannel, syscall.SIGTERM, syscall.SIGQUIT)
signal.Notify(sigchannel, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGHUP)

wg.Add(1)

Expand Down Expand Up @@ -464,12 +464,13 @@ func startGNMIServer(telemetryCfg *TelemetryConfig, cfg *gnmi.Config, serverCont

serverControlValue := <-serverControlSignal
log.V(1).Infof("Received signal for gnmi server to close")
s.Stop()
if serverControlValue == ServerStop {
s.ForceStop() // No graceful stop
stopSignalHandler <- true
log.Flush()
return
}
s.Stop() // Graceful stop
// Both ServerStart and ServerRestart will loop and restart server
// We use different value to distinguish between write/create and remove/rename
}
Expand Down
89 changes: 88 additions & 1 deletion telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestStartGNMIServer(t *testing.T) {
wg := &sync.WaitGroup{}

exitCalled := false
patches.ApplyMethod(reflect.TypeOf(&gnmi.Server{}), "Stop", func(_ *gnmi.Server) {
patches.ApplyMethod(reflect.TypeOf(&gnmi.Server{}), "ForceStop", func(_ *gnmi.Server) {
exitCalled = true
})

Expand All @@ -206,6 +206,91 @@ func TestStartGNMIServer(t *testing.T) {

wg.Wait()

if !exitCalled {
t.Errorf("s.ForceStop should be called if gnmi server is called to shutdown")
}
}

func TestStartGNMIServerGracefulStop(t *testing.T) {
testServerCert := "../testdata/certs/testserver.cert"
testServerKey := "../testdata/certs/testserver.key"
timeoutInterval := 15
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutInterval) * time.Second)
defer cancel()

originalArgs := os.Args
defer func() {
os.Args = originalArgs
}()

fs := flag.NewFlagSet("testStartGNMIServer", flag.ContinueOnError)
os.Args = []string{"cmd", "-port", "8080", "-server_crt", testServerCert, "-server_key", testServerKey}
telemetryCfg, cfg, err := setupFlags(fs)

if err != nil {
t.Errorf("Expected err to be nil, got err %v", err)
}

patches := gomonkey.ApplyFunc(tls.LoadX509KeyPair, func(certFile, keyFile string) (tls.Certificate, error) {
return tls.Certificate{}, nil
})
patches.ApplyFunc(gnmi.NewServer, func(cfg *gnmi.Config, opts []grpc.ServerOption) (*gnmi.Server, error) {
return &gnmi.Server{}, nil
})
patches.ApplyFunc(grpc.Creds, func(credentials.TransportCredentials) grpc.ServerOption {
return grpc.EmptyServerOption{}
})
patches.ApplyMethod(reflect.TypeOf(&gnmi.Server{}), "Serve", func(_ *gnmi.Server) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(&gnmi.Server{}), "Address", func(_ *gnmi.Server) string {
return ""
})
patches.ApplyFunc(iNotifyCertMonitoring, func(_ *fsnotify.Watcher, _ *TelemetryConfig, serverControlSignal chan<- ServerControlValue, testReadySignal chan<- int, certLoaded *int32) {
})

serverControlSignal := make(chan ServerControlValue, 1)
stopSignalHandler := make(chan bool, 1)
wg := &sync.WaitGroup{}

counter := 0
exitCalled := false
patches.ApplyMethod(reflect.TypeOf(&gnmi.Server{}), "Stop", func(_ *gnmi.Server) {
exitCalled = true
})
patches.ApplyMethod(reflect.TypeOf(&gnmi.Server{}), "ForceStop", func(_ *gnmi.Server) {
})


defer patches.Reset()

wg.Add(1)

go startGNMIServer(telemetryCfg, cfg, serverControlSignal, stopSignalHandler, wg)

for {
select {
case <-tick.C:
if counter == 0 { // simulate cert rotation first
sendSignal(serverControlSignal, ServerRestart)
} else { // simulate sigterm second
sendSignal(serverControlSignal, ServerStop)
}
counter += 1
case <-ctx.Done():
t.Errorf("Failed to send shutdown signal")
return
}
if counter > 1 { // both signals have been sent
break
}
}

wg.Wait()

if !exitCalled {
t.Errorf("s.Stop should be called if gnmi server is called to shutdown")
}
Expand Down Expand Up @@ -822,6 +907,8 @@ func TestINotifyCertMonitoringAddWatcherError(t *testing.T) {
func TestSignalHandler(t *testing.T) {
testHandlerSyscall(t, syscall.SIGTERM)
testHandlerSyscall(t, syscall.SIGQUIT)
testHandlerSyscall(t, syscall.SIGINT)
testHandlerSyscall(t, syscall.SIGHUP)
testHandlerSyscall(t, nil) // Test that ServerStop should make signalHandler exit
}

Expand Down

0 comments on commit 7801415

Please sign in to comment.