diff --git a/ray-operator/test/sampleyaml/rayservice_test.go b/ray-operator/test/sampleyaml/rayservice_test.go index d6f7cc735e..cf488ea157 100644 --- a/ray-operator/test/sampleyaml/rayservice_test.go +++ b/ray-operator/test/sampleyaml/rayservice_test.go @@ -77,6 +77,8 @@ func TestRayService(t *testing.T) { // Check if all applications are running g.Eventually(RayService(test, namespace.Name, rayServiceFromYaml.Name), TestTimeoutMedium).Should(WithTransform(AllAppsRunning, BeTrue())) + // Query dashboard to get the serve application status in head pod + g.Eventually(QueryDashboardGetAppStatus(test, rayCluster), TestTimeoutShort).Should(Succeed()) }) } } diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index c3bff396d3..b0c568f997 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -1,6 +1,7 @@ package sampleyaml import ( + "fmt" "os" "path/filepath" "runtime" @@ -10,6 +11,7 @@ import ( corev1 "k8s.io/api/core/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" . "github.com/ray-project/kuberay/ray-operator/test/support" ) @@ -82,3 +84,28 @@ func AllAppsRunning(rayService *rayv1.RayService) bool { } return true } + +func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomega) { + return func(g Gomega) { + rayDashboardClient := &utils.RayDashboardClient{} + pod, err := GetHeadPod(t, rayCluster) + g.Expect(err).ToNot(HaveOccurred()) + + localPort := 8265 + remotePort := 8265 + stopChan, err := SetupPortForward(t, pod.Name, pod.Namespace, localPort, remotePort) + defer close(stopChan) + + g.Expect(err).ToNot(HaveOccurred()) + url := fmt.Sprintf("127.0.0.1:%d", localPort) + + err = rayDashboardClient.InitClient(t.Ctx(), url, rayCluster) + g.Expect(err).ToNot(HaveOccurred()) + serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx()) + g.Expect(err).ToNot(HaveOccurred()) + + for _, value := range serveDetails.Applications { + g.Expect(value.ServeApplicationStatus.Status).To(Equal(rayv1.ApplicationStatusEnum.RUNNING)) + } + } +} diff --git a/ray-operator/test/support/core.go b/ray-operator/test/support/core.go index a4a509fd4d..d05a959404 100644 --- a/ray-operator/test/support/core.go +++ b/ray-operator/test/support/core.go @@ -2,16 +2,21 @@ package support import ( "bytes" + "fmt" "io" + "net/http" + "strings" - "github.com/stretchr/testify/assert" - + . "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/remotecommand" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" ) func Pods(t Test, namespace string, options ...Option[*metav1.ListOptions]) func(g gomega.Gomega) []corev1.Pod { @@ -97,3 +102,47 @@ func ExecPodCmd(t Test, pod *corev1.Pod, containerName string, cmd []string) { t.T().Logf("Command stderr: %s", stderr.String()) assert.NoError(t.T(), err) } + +func SetupPortForward(t Test, podName, namespace string, localPort, remotePort int) (chan struct{}, error) { + cfg := t.Client().Config() + + req := t.Client().Core().CoreV1().RESTClient(). + Post(). + Resource("pods"). + Namespace(namespace). + Name(podName). + SubResource("portforward") + + transport, upgrader, err := spdy.RoundTripperFor(&cfg) + if err != nil { + return nil, err + } + + stopChan := make(chan struct{}, 1) + readyChan := make(chan struct{}, 1) + out := new(strings.Builder) + errOut := new(strings.Builder) + + // create port forward + forwarder, err := portforward.New( + spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL()), + []string{fmt.Sprintf("%d:%d", localPort, remotePort)}, + stopChan, + readyChan, + out, + errOut, + ) + if err != nil { + return nil, err + } + + // launch Port Forward + go func() { + defer GinkgoRecover() + err := forwarder.ForwardPorts() + assert.NoError(t.T(), err) + }() + <-readyChan // wait for port forward to finish + + return stopChan, nil +} diff --git a/ray-operator/test/support/ray.go b/ray-operator/test/support/ray.go index c9bdec236b..9273696da2 100644 --- a/ray-operator/test/support/ray.go +++ b/ray-operator/test/support/ray.go @@ -6,11 +6,11 @@ import ( "github.com/onsi/gomega" "github.com/stretchr/testify/assert" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" ) func RayJob(t Test, namespace, name string) func() (*rayv1.RayJob, error) {