Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for PodProbeMarker http/tcp capacities #1321

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/daemon/podprobe/pod_probe_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

Check failure on line 47 in pkg/daemon/podprobe/pod_probe_controller.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofmt`-ed with `-s` (gofmt)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42% of developers fix this issue

gofmt: File is not gofmt-ed with -s


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitishchauhan0022 you need resolve the gofmt check.

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -88,6 +89,8 @@
nodeName string
// kruise daemon start time
start time.Time
// runtime client
runtimeClient runtimeclient.Client
}

// NewController returns the controller for pod probe
Expand Down Expand Up @@ -126,6 +129,7 @@
nodeName: nodeName,
eventRecorder: recorder,
start: time.Now(),
runtimeClient: opts.RuntimeClient,
}
c.prober = newProber(c.runtimeFactory.GetRuntimeService())
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -407,6 +411,25 @@
return containerStatus, err
}

func (c *Controller) fetchPodIp(podUID, name, namespace string) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our internal cluster is quite large and there are performance issues if Daemon needs to watch Pod resources, so it is recommended that Pod IP can be passed down directly from the NodePodProbeSpec resource.


podList := &corev1.PodList{}
err := c.runtimeClient.List(context.TODO(), podList, runtimeclient.MatchingFields{"metadata.name": name, "metadata.namespace": namespace})

if err != nil {
klog.Errorf("Failed to fetch pod list in namespace %s: %v", namespace, err)
return "", err
}

for _, pod := range podList.Items {
if string(pod.UID) == podUID {
klog.Info("pod ip is ", pod.Status.PodIP)
return pod.Status.PodIP, nil
}
}
return "", fmt.Errorf("pod %s with Uid %s not found in namespace %s", name, podUID, namespace)
}

func updateNodePodProbeStatus(update Update, newStatus *appsv1alpha1.NodePodProbeStatus) {
var probeStatus *appsv1alpha1.PodProbeStatus
for i := range newStatus.PodProbeStatuses {
Expand Down
41 changes: 36 additions & 5 deletions pkg/daemon/podprobe/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand All @@ -28,6 +30,8 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
"k8s.io/utils/exec"
)

Expand All @@ -36,6 +40,8 @@ const maxProbeMessageLength = 1024
// Prober helps to check the probe(exec, http, tcp) of a container.
type prober struct {
exec execprobe.Prober
tcp tcpprobe.Prober
http httpprobe.Prober
runtimeService criapi.RuntimeService
}

Expand All @@ -44,13 +50,15 @@ type prober struct {
func newProber(runtimeService criapi.RuntimeService) *prober {
return &prober{
exec: execprobe.New(),
tcp: tcpprobe.New(),
http: httpprobe.New(false),
runtimeService: runtimeService,
}
}

// probe probes the container.
func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) {
result, msg, err := pb.runProbe(p, container, containerID)
func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string, hostIP string) (appsv1alpha1.ProbeState, string, error) {
result, msg, err := pb.runProbe(p, container, containerID, hostIP)
if bytes.Count([]byte(msg), nil)-1 > maxProbeMessageLength {
msg = msg[:maxProbeMessageLength]
}
Expand All @@ -60,17 +68,40 @@ func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeap
return appsv1alpha1.ProbeSucceeded, msg, nil
}

func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) {
func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string, hostIP string) (probe.Result, string, error) {
timeSecond := p.TimeoutSeconds
if timeSecond <= 0 {
timeSecond = 1
}
timeout := time.Duration(timeSecond) * time.Second
// current only support exec
// todo: http, tcp
// for probing using exec method
if p.Exec != nil {
return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout))
}

// for probing using tcp method
if p.TCPSocket.Port.IntVal != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if p.TCPSocket.Host != "" {
return pb.tcp.Probe(p.TCPSocket.Host, p.TCPSocket.Port.IntValue(), timeout)
} else {
return pb.tcp.Probe(hostIP, p.TCPSocket.Port.IntValue(), timeout)
}
}

// for probing using http method
if p.HTTPGet.Path != "" {
var u url.URL
var header http.Header
u.Scheme = "http"
u.Path = p.HTTPGet.Path
if p.HTTPGet.Host != "" {
u.Host = p.HTTPGet.Host
} else {
u.Host = hostIP
}
return pb.http.Probe(&u, header, timeout)
}

klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/daemon/podprobe/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

podIp, err := w.probeController.fetchPodIp(w.key.podUID, w.key.podName, w.key.podNs)
if err != nil {
klog.Errorf("Pod(%s/%s) fetchPodIP failed: %s", w.key.podNs, w.key.podName, err.Error())
return true
}

container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName)
if container == nil {
klog.V(5).Infof("Pod(%s/%s) container(%s) Not Found", w.key.podNs, w.key.podName, w.key.containerName)
Expand Down Expand Up @@ -152,7 +158,7 @@ func (w *worker) doProbe() (keepGoing bool) {

// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID)
result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID, podIp) //Main Line
if err != nil {
klog.Errorf("Pod(%s/%s) do container(%s) probe(%s) spec(%s) failed: %s",
w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, util.DumpJSON(w.spec), err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ func validateHandler(handler *corev1.Handler, fldPath *field.Path) field.ErrorLi
numHandlers++
allErrors = append(allErrors, validateExecAction(handler.Exec, fldPath.Child("exec"))...)
}
if handler.HTTPGet != nil || handler.TCPSocket != nil {
if handler.HTTPGet != nil {
numHandlers++
allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current only support exec probe"))
allErrors = append(allErrors, validateHTTPGetAction(handler.HTTPGet, fldPath.Child("httpGet"))...)
}
if handler.TCPSocket != nil {
numHandlers++
allErrors = append(allErrors, validateTCPSocketAction(handler.TCPSocket, fldPath.Child("tcpSocket"))...)
}
if numHandlers == 0 {
allErrors = append(allErrors, field.Required(fldPath, "must specify a handler type"))
Expand All @@ -205,6 +209,25 @@ func validateExecAction(exec *corev1.ExecAction, fldPath *field.Path) field.Erro
return allErrors
}

func validateHTTPGetAction(httpGet *corev1.HTTPGetAction, fldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if httpGet.Port.IntValue() == 0 {
allErrors = append(allErrors, field.Required(fldPath.Child("port"), ""))
}
if httpGet.Path == "" {
allErrors = append(allErrors, field.Required(fldPath.Child("path"), ""))
}
return allErrors
}

func validateTCPSocketAction(tcpSocket *corev1.TCPSocketAction, fldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if tcpSocket.Port.IntValue() == 0 {
allErrors = append(allErrors, field.Required(fldPath.Child("port"), ""))
}
return allErrors
}

func validateProbeMarkerPolicy(policy *appsv1alpha1.ProbeMarkerPolicy, fldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if policy.State != appsv1alpha1.ProbeSucceeded && policy.State != appsv1alpha1.ProbeFailed {
Expand Down
1 change: 1 addition & 0 deletions vendor/k8s.io/component-base/version/.gitattributes

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions vendor/k8s.io/component-base/version/base.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions vendor/k8s.io/component-base/version/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading