Skip to content

Commit

Permalink
fix: retry when getting EOF error at E2E test (#281)
Browse files Browse the repository at this point in the history
This change partially resolves #210. By adding retrying logic on pod log stream initialization and port forwarding, I was able to resolve two kinds of EOF errors, which are two of the most frequent causes of E2E test failure.

More investigation notes in #210 (comment)

Error 1
```
Watching POD: simple-pipeline-watermark-cat3-0-np6xu
error: Get "https://172.18.0.3:10250/containerLogs/numaflow-system/simple-pipeline-watermark-cat3-0-np6xu/main?follow=true": EOF    functional_test.go:180: Expected vertex pod log contains "Start processing udf messages"
```
and Error 2

```
functional_test.go:39: Vertex POD name: simple-pipeline-input-0-fpizv
panic: error upgrading connection: error dialing backend: EOF
```

### Testing
The following E2E test runs show in the log that the retry is getting invoked, which helps passing the E2E tests.
* Retrying on Error 1 - https://github.com/numaproj/numaflow/actions/runs/3346765321/jobs/5544895739
* Retrying on Error 2 - https://github.com/KeranYang/numaflow/actions/runs/3341257920/jobs/5532282295

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored and whynowy committed Nov 7, 2022
1 parent f5db937 commit dc96b87
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
curl -L https://github.com/nats-io/nats-server/releases/download/v$JS_VERSION/nats-server-v$JS_VERSION-linux-amd64.zip -o nats-server.zip
sudo unzip nats-server.zip -d nats-server
nats-server/nats-server-v$JS_VERSION-linux-amd64/nats-server -js &
- name: Insall Redis
- name: Install Redis
uses: shogo82148/actions-setup-redis@v1
with:
redis-version: "6.2"
Expand Down
6 changes: 3 additions & 3 deletions test/fixtures/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func (t *Expect) VertexPodLogContains(vertexName, regex string, opts ...PodLogCh
ctx := context.Background()
contains, err := VertexPodLogContains(ctx, t.kubeClient, Namespace, t.pipeline.Name, vertexName, regex, opts...)
if err != nil {
t.t.Fatalf("Failed to check vertex pod logs: %v", err)
t.t.Fatalf("Failed to check vertex %q pod logs: %v", vertexName, err)
}
if !contains {
t.t.Fatalf("Expected vertex pod log contains %q", regex)
t.t.Fatalf("Expected vertex %q pod log contains %q", vertexName, regex)
}
return t
}
Expand All @@ -96,7 +96,7 @@ func (t *Expect) VertexPodLogNotContains(vertexName, regex string, opts ...PodLo
t.t.Fatalf("Failed to check vertex pod logs: %v", err)
}
if !yes {
t.t.Fatalf("Not expected vertex pod log contains %q", regex)
t.t.Fatalf("Not expected vertex %q pod log contains %q", vertexName, regex)
}
return t
}
Expand Down
28 changes: 27 additions & 1 deletion test/fixtures/port_forward.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package fixtures

import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

func PodPortForward(config *rest.Config, namespace, podName string, localPort, remotePort int, stopCh <-chan struct{}) error {
ctx := context.Background()
readyCh := make(chan struct{})

path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward",
Expand All @@ -37,12 +41,34 @@ func PodPortForward(config *rest.Config, namespace, podName string, localPort, r
}

go func() {
if err := fw.ForwardPorts(); err != nil {
var err error
// Port forwarding can fail due to transient "error upgrading connection: error dialing backend: EOF" issue.
// To prevent such issue, we apply retry strategy.
// 3 attempts with 1 second fixed wait time are tested sufficient for it.
var retryBackOff = wait.Backoff{
Factor: 1,
Jitter: 0,
Steps: 3,
Duration: time.Second * 1,
}

_ = wait.ExponentialBackoffWithContext(ctx, retryBackOff, func() (done bool, err error) {
err = fw.ForwardPorts()
if err == nil {
return true, nil
}

fmt.Printf("Got error %v, retrying.\n", err)
return false, nil
})

if err != nil {
panic(err)
}
}()

<-readyCh

return nil

}
31 changes: 27 additions & 4 deletions test/fixtures/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"regexp"
Expand All @@ -18,6 +19,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -273,7 +275,7 @@ func PodsLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, na
for _, p := range podList.Items {
go func(podName string) {
fmt.Printf("Watching POD: %s\n", podName)
if err := podLogContains(cctx, kubeClient, namespace, podName, o.container, regex, resultChan, errChan); err != nil {
if err := podLogContains(cctx, kubeClient, namespace, podName, o.container, regex, resultChan); err != nil {
errChan <- err
return
}
Expand Down Expand Up @@ -325,7 +327,7 @@ func PodsLogContains(ctx context.Context, kubeClient kubernetes.Interface, names
for _, p := range podList.Items {
go func(podName string) {
fmt.Printf("Watching POD: %s\n", podName)
if err := podLogContains(cctx, kubeClient, namespace, podName, o.container, regex, resultChan, errChan); err != nil {
if err := podLogContains(cctx, kubeClient, namespace, podName, o.container, regex, resultChan); err != nil {
errChan <- err
return
}
Expand Down Expand Up @@ -354,8 +356,29 @@ func PodsLogContains(ctx context.Context, kubeClient kubernetes.Interface, names
}
}

func podLogContains(ctx context.Context, client kubernetes.Interface, namespace, podName, containerName, regex string, result chan bool, errs chan error) error {
stream, err := client.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{Follow: true, Container: containerName}).Stream(ctx)
func podLogContains(ctx context.Context, client kubernetes.Interface, namespace, podName, containerName, regex string, result chan bool) error {
var stream io.ReadCloser
var err error
// Streaming logs from file could be rotated by container log manager and as consequence, we receive EOF and need to re-initialize the stream.
// To prevent such issue, we apply retry on stream initialization.
// 3 attempts with 1 second fixed wait time are tested sufficient for it.
var retryBackOff = wait.Backoff{
Factor: 1,
Jitter: 0,
Steps: 3,
Duration: time.Second * 1,
}

_ = wait.ExponentialBackoffWithContext(ctx, retryBackOff, func() (done bool, err error) {
stream, err = client.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{Follow: true, Container: containerName}).Stream(ctx)
if err == nil {
return true, nil
}

fmt.Printf("Got error %v, retrying.\n", err)
return false, nil
})

if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions test/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type When struct {
restConfig *rest.Config
kubeClient kubernetes.Interface

portForwarderStopChanels map[string]chan struct{}
portForwarderStopChannels map[string]chan struct{}
}

func (w *When) CreateISBSvc() *When {
Expand Down Expand Up @@ -135,10 +135,10 @@ func (w *When) VertexPodPortForward(vertexName string, localPort, remotePort int
if err = PodPortForward(w.restConfig, Namespace, podName, localPort, remotePort, stopCh); err != nil {
w.t.Fatalf("Expected vertex pod port-forward: %v", err)
}
if w.portForwarderStopChanels == nil {
w.portForwarderStopChanels = make(map[string]chan struct{})
if w.portForwarderStopChannels == nil {
w.portForwarderStopChannels = make(map[string]chan struct{})
}
w.portForwarderStopChanels[podName] = stopCh
w.portForwarderStopChannels[podName] = stopCh
return w
}

Expand All @@ -157,17 +157,17 @@ func (w *When) DaemonPodPortForward(pipelineName string, localPort, remotePort i
if err = PodPortForward(w.restConfig, Namespace, podName, localPort, remotePort, stopCh); err != nil {
w.t.Fatalf("Expected daemon pod port-forward: %v", err)
}
if w.portForwarderStopChanels == nil {
w.portForwarderStopChanels = make(map[string]chan struct{})
if w.portForwarderStopChannels == nil {
w.portForwarderStopChannels = make(map[string]chan struct{})
}
w.portForwarderStopChanels[podName] = stopCh
w.portForwarderStopChannels[podName] = stopCh
return w
}

func (w *When) TerminateAllPodPortForwards() *When {
w.t.Helper()
if len(w.portForwarderStopChanels) > 0 {
for k, v := range w.portForwarderStopChanels {
if len(w.portForwarderStopChannels) > 0 {
for k, v := range w.portForwarderStopChannels {
w.t.Logf("Terminating port-forward for POD %s", k)
close(v)
}
Expand Down

0 comments on commit dc96b87

Please sign in to comment.