Skip to content

Commit

Permalink
[#334]: feature: tests for JOBS plugin status and readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jun 22, 2023
2 parents a3d4e84 + 670e19e commit 985cd4e
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 11 deletions.
2 changes: 1 addition & 1 deletion plugins/status/configs/.rr-ready-init.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'

rpc:
listen: tcp://127.0.0.1:6007
listen: tcp://127.0.0.1:6006

server:
command: "php ../../php_test_files/sleep.php"
Expand Down
109 changes: 99 additions & 10 deletions plugins/status/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ func TestStatusRPC(t *testing.T) {
}()

time.Sleep(time.Second)
t.Run("CheckerGetStatusRpc", checkRPCStatus)
t.Run("CheckerGetStatusRpc", func(t *testing.T) {
checkRPCStatus(t, "http", 200, "6005")
})
stopCh <- struct{}{}
wg.Wait()
}
Expand Down Expand Up @@ -298,7 +300,9 @@ func TestReadinessRPCWorkerNotReady(t *testing.T) {
t.Run("DoHttpReq", doHTTPReq)
time.Sleep(time.Second * 5)
t.Run("CheckerGetReadiness2", checkHTTPReadiness2)
t.Run("CheckerGetRpcReadiness", checkRPCReadiness)
t.Run("CheckerGetRpcReadiness", func(t *testing.T) {
checkRPCReadiness(t, "http", 503, "6006")
})
stopCh <- struct{}{}
wg.Wait()
}
Expand Down Expand Up @@ -386,6 +390,91 @@ func TestJobsStatus(t *testing.T) {
wg.Wait()
}

func TestJobsReadiness(t *testing.T) {
cont := endure.New(slog.LevelDebug, endure.GracefulShutdownTimeout(time.Second))

cfg := &config.Plugin{
Version: "2023.2.0",
Path: "configs/.rr-jobs-status.yaml",
Prefix: "rr",
}

err := cont.RegisterAll(
cfg,
&rpcPlugin.Plugin{},
&logger.Plugin{},
&server.Plugin{},
&status.Plugin{},
&jobs.Plugin{},
)
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}

ch, err := cont.Serve()
assert.NoError(t, err)

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

wg := &sync.WaitGroup{}
wg.Add(1)

stopCh := make(chan struct{}, 1)

go func() {
defer wg.Done()
for {
select {
case e := <-ch:
assert.Fail(t, "error", e.Error.Error())
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
case <-sig:
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
case <-stopCh:
// timeout, error here is OK, because in the PHP we are sleeping for the 300s
_ = cont.Stop()
return
}
}
}()

time.Sleep(time.Second)
t.Run("checkJobsReadiness", checkJobsReadiness)
t.Run("checkJobsRPC", func(t *testing.T) {
checkRPCReadiness(t, "jobs", 200, "6007")
checkRPCStatus(t, "jobs", 200, "6007")
})

stopCh <- struct{}{}
wg.Wait()
}

func checkJobsReadiness(t *testing.T) {
req, err := http.NewRequest("GET", "http://127.0.0.1:35544/ready?plugin=jobs", nil)
assert.NoError(t, err)

r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
b, err := io.ReadAll(r.Body)
assert.NoError(t, err)
assert.Equal(t, 200, r.StatusCode)
assert.Equal(t, "plugin: jobs, status: 200\n", string(b))

err = r.Body.Close()
assert.NoError(t, err)
}

func checkHTTPStatus(t *testing.T) {
req, err := http.NewRequest("GET", "http://127.0.0.1:34333/health?plugin=http&plugin=rpc", nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -444,34 +533,34 @@ func checkHTTPReadiness(t *testing.T) {
assert.NoError(t, err)
}

func checkRPCReadiness(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6007")
func checkRPCReadiness(t *testing.T, plugin string, status int64, port string) {
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))

req := &statusv1beta1.Request{
Plugin: "http",
Plugin: plugin,
}

rsp := &statusv1beta1.Response{}

err = client.Call("status.Ready", req, rsp)
assert.NoError(t, err)
assert.Equal(t, rsp.GetCode(), int64(503))
assert.Equal(t, rsp.GetCode(), status)
}

func checkRPCStatus(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6005")
func checkRPCStatus(t *testing.T, plugin string, status int64, port string) {
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))

req := &statusv1beta1.Request{
Plugin: "http",
Plugin: plugin,
}

rsp := &statusv1beta1.Response{}

err = client.Call("status.Status", req, rsp)
assert.NoError(t, err)
assert.Equal(t, rsp.Code, int64(200))
assert.Equal(t, rsp.Code, status)
}

0 comments on commit 985cd4e

Please sign in to comment.