Skip to content

Commit

Permalink
[Test] Query dashboard to get the serve application status in head pod (
Browse files Browse the repository at this point in the history
  • Loading branch information
CheyuWu authored Nov 6, 2024
1 parent b875b85 commit 32b8b9f
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 5 deletions.
2 changes: 2 additions & 0 deletions ray-operator/test/sampleyaml/rayservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func TestRayService(t *testing.T) {

// Check if all applications are running
g.Eventually(RayService(test, namespace.Name, rayServiceFromYaml.Name), TestTimeoutMedium).Should(WithTransform(AllAppsRunning, BeTrue()))
// Query dashboard to get the serve application status in head pod
g.Eventually(QueryDashboardGetAppStatus(test, rayCluster), TestTimeoutShort).Should(Succeed())
})
}
}
27 changes: 27 additions & 0 deletions ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sampleyaml

import (
"fmt"
"os"
"path/filepath"
"runtime"
Expand All @@ -10,6 +11,7 @@ import (
corev1 "k8s.io/api/core/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

Expand Down Expand Up @@ -82,3 +84,28 @@ func AllAppsRunning(rayService *rayv1.RayService) bool {
}
return true
}

func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomega) {
return func(g Gomega) {
rayDashboardClient := &utils.RayDashboardClient{}
pod, err := GetHeadPod(t, rayCluster)
g.Expect(err).ToNot(HaveOccurred())

localPort := 8265
remotePort := 8265
stopChan, err := SetupPortForward(t, pod.Name, pod.Namespace, localPort, remotePort)
defer close(stopChan)

g.Expect(err).ToNot(HaveOccurred())
url := fmt.Sprintf("127.0.0.1:%d", localPort)

err = rayDashboardClient.InitClient(t.Ctx(), url, rayCluster)
g.Expect(err).ToNot(HaveOccurred())
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
g.Expect(err).ToNot(HaveOccurred())

for _, value := range serveDetails.Applications {
g.Expect(value.ServeApplicationStatus.Status).To(Equal(rayv1.ApplicationStatusEnum.RUNNING))
}
}
}
53 changes: 51 additions & 2 deletions ray-operator/test/support/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package support

import (
"bytes"
"fmt"
"io"
"net/http"
"strings"

"github.com/stretchr/testify/assert"

. "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

func Pods(t Test, namespace string, options ...Option[*metav1.ListOptions]) func(g gomega.Gomega) []corev1.Pod {
Expand Down Expand Up @@ -97,3 +102,47 @@ func ExecPodCmd(t Test, pod *corev1.Pod, containerName string, cmd []string) {
t.T().Logf("Command stderr: %s", stderr.String())
assert.NoError(t.T(), err)
}

func SetupPortForward(t Test, podName, namespace string, localPort, remotePort int) (chan struct{}, error) {
cfg := t.Client().Config()

req := t.Client().Core().CoreV1().RESTClient().
Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward")

transport, upgrader, err := spdy.RoundTripperFor(&cfg)
if err != nil {
return nil, err
}

stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{}, 1)
out := new(strings.Builder)
errOut := new(strings.Builder)

// create port forward
forwarder, err := portforward.New(
spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL()),
[]string{fmt.Sprintf("%d:%d", localPort, remotePort)},
stopChan,
readyChan,
out,
errOut,
)
if err != nil {
return nil, err
}

// launch Port Forward
go func() {
defer GinkgoRecover()
err := forwarder.ForwardPorts()
assert.NoError(t.T(), err)
}()
<-readyChan // wait for port forward to finish

return stopChan, nil
}
6 changes: 3 additions & 3 deletions ray-operator/test/support/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
)

func RayJob(t Test, namespace, name string) func() (*rayv1.RayJob, error) {
Expand Down

0 comments on commit 32b8b9f

Please sign in to comment.