diff --git a/plugins/status/configs/.rr-ready-init.yaml b/plugins/status/configs/.rr-ready-init.yaml index d0d3b58b..f66ad419 100644 --- a/plugins/status/configs/.rr-ready-init.yaml +++ b/plugins/status/configs/.rr-ready-init.yaml @@ -1,7 +1,7 @@ version: '3' rpc: - listen: tcp://127.0.0.1:6007 + listen: tcp://127.0.0.1:6006 server: command: "php ../../php_test_files/sleep.php" diff --git a/plugins/status/plugin_test.go b/plugins/status/plugin_test.go index 0b4fc540..aa25528b 100644 --- a/plugins/status/plugin_test.go +++ b/plugins/status/plugin_test.go @@ -162,7 +162,9 @@ func TestStatusRPC(t *testing.T) { }() time.Sleep(time.Second) - t.Run("CheckerGetStatusRpc", checkRPCStatus) + t.Run("CheckerGetStatusRpc", func(t *testing.T) { + checkRPCStatus(t, "http", 200, "6005") + }) stopCh <- struct{}{} wg.Wait() } @@ -298,7 +300,9 @@ func TestReadinessRPCWorkerNotReady(t *testing.T) { t.Run("DoHttpReq", doHTTPReq) time.Sleep(time.Second * 5) t.Run("CheckerGetReadiness2", checkHTTPReadiness2) - t.Run("CheckerGetRpcReadiness", checkRPCReadiness) + t.Run("CheckerGetRpcReadiness", func(t *testing.T) { + checkRPCReadiness(t, "http", 503, "6006") + }) stopCh <- struct{}{} wg.Wait() } @@ -386,6 +390,91 @@ func TestJobsStatus(t *testing.T) { wg.Wait() } +func TestJobsReadiness(t *testing.T) { + cont := endure.New(slog.LevelDebug, endure.GracefulShutdownTimeout(time.Second)) + + cfg := &config.Plugin{ + Version: "2023.2.0", + Path: "configs/.rr-jobs-status.yaml", + Prefix: "rr", + } + + err := cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.Plugin{}, + &server.Plugin{}, + &status.Plugin{}, + &jobs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout, error here is OK, because in the PHP we are sleeping for the 300s + _ = cont.Stop() + return + } + } + }() + + time.Sleep(time.Second) + t.Run("checkJobsReadiness", checkJobsReadiness) + t.Run("checkJobsRPC", func(t *testing.T) { + checkRPCReadiness(t, "jobs", 200, "6007") + checkRPCStatus(t, "jobs", 200, "6007") + }) + + stopCh <- struct{}{} + wg.Wait() +} + +func checkJobsReadiness(t *testing.T) { + req, err := http.NewRequest("GET", "http://127.0.0.1:35544/ready?plugin=jobs", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "plugin: jobs, status: 200\n", string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + func checkHTTPStatus(t *testing.T) { req, err := http.NewRequest("GET", "http://127.0.0.1:34333/health?plugin=http&plugin=rpc", nil) assert.NoError(t, err) @@ -444,34 +533,34 @@ func checkHTTPReadiness(t *testing.T) { assert.NoError(t, err) } -func checkRPCReadiness(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6007") +func checkRPCReadiness(t *testing.T, plugin string, status int64, port string) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) req := &statusv1beta1.Request{ - Plugin: "http", + Plugin: plugin, } rsp := &statusv1beta1.Response{} err = client.Call("status.Ready", req, rsp) assert.NoError(t, err) - assert.Equal(t, rsp.GetCode(), int64(503)) + assert.Equal(t, rsp.GetCode(), status) } -func checkRPCStatus(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6005") +func checkRPCStatus(t *testing.T, plugin string, status int64, port string) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) req := &statusv1beta1.Request{ - Plugin: "http", + Plugin: plugin, } rsp := &statusv1beta1.Response{} err = client.Call("status.Status", req, rsp) assert.NoError(t, err) - assert.Equal(t, rsp.Code, int64(200)) + assert.Equal(t, rsp.Code, status) }