Skip to content

Commit

Permalink
Allow kubernetes discoverer to use gRPC destinations (#829)
Browse files Browse the repository at this point in the history
* Allow kubernetes discoverer to use gRPC destinations

Summary
I updated the kubnetes discoverer to look for veneur global containers with a grpc port name. This will allow us to specify that we are using gRPC.
Additionally, I removed the http:// prefix in the saved podIp because it hits a "too many colons in address" error with gRPC. This will still work with forwarding with http because in proxy.go, the doPost method will check and append it if its missing.
This PR is related to issue #762 as it removes the http hard coding.

Motivation
In Kubernetes we want to proxy metrics to veneur global with gRPC. This edit in our fork fixed the issues we were hitting.

Test plan
Ran this in our veneur-proxy pods that utilize this Kubernetes discoverer code with gRPC to verify everything works.

Rollout/monitoring/revert plan
This change should be backwards compatible as the doPost function for http communication prepends the necessary prefix. This only affects gRPC destinations used by proxysrv which shouldn't have been available with kubernetes.

* Update CHANGELOG for new release

* Pulled the logic that generates pod IPs from pod info into it's own
function to make it more testable and added some tests on generating
these ips

Co-authored-by: Chris Solidum <[email protected]>
Co-authored-by: csolidum <[email protected]>
  • Loading branch information
3 people authored Jun 21, 2021
1 parent ed29417 commit cf35312
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 46 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 14.2.0, UNRELEASED

## Bugfixes
* A fix for forwarding metrics with gRPC using the kubernetes discoverer. Thanks, [androohan](https://github.com/androohan)!

# 14.1.0, 2021-03-16

## Added
Expand Down
105 changes: 59 additions & 46 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,62 @@ func NewKubernetesDiscoverer() (*KubernetesDiscoverer, error) {
return &KubernetesDiscoverer{clientset}, nil
}

func getDestinationFromPod(podIndex int, pod v1.Pod) string {
var forwardPort string
protocolPrefix := ""

if pod.Status.Phase != v1.PodRunning {
return ""
}

// TODO don't assume there is only one container for the veneur global
if len(pod.Spec.Containers) > 0 {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == "grpc" {
forwardPort = strconv.Itoa(int(port.ContainerPort))
log.WithField("port", forwardPort).Debug("Found grpc port")
break
}

if port.Name == "http" {
protocolPrefix = "http://"
forwardPort = strconv.Itoa(int(port.ContainerPort))
log.WithField("port", forwardPort).Debug("Found http port")
break
}

// TODO don't assume all TCP ports are for importing
if port.Protocol == "TCP" {
protocolPrefix = "http://"
forwardPort = strconv.Itoa(int(port.ContainerPort))
log.WithField("port", forwardPort).Debug("Found TCP port")
}
}
}
}

if forwardPort == "" || forwardPort == "0" {
log.WithFields(logrus.Fields{
"podIndex": podIndex,
"PodIP": pod.Status.PodIP,
"forwardPort": forwardPort,
}).Error("Could not find valid port for forwarding")
return ""
}

if pod.Status.PodIP == "" {
log.WithFields(logrus.Fields{
"podIndex": podIndex,
"PodIP": pod.Status.PodIP,
"forwardPort": forwardPort,
}).Error("Could not find valid podIP for forwarding")
return ""
}

return fmt.Sprintf("%s%s:%s", protocolPrefix, pod.Status.PodIP, forwardPort)
}

func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([]string, error) {
pods, err := kd.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
LabelSelector: "app=veneur-global",
Expand All @@ -39,53 +95,10 @@ func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([

ips := make([]string, 0, len(pods.Items))
for podIndex, pod := range pods.Items {

var forwardPort string

if pod.Status.Phase != v1.PodRunning {
continue
podIp := getDestinationFromPod(podIndex, pod)
if len(podIp) > 0 {
ips = append(ips, podIp)
}

// TODO don't assume there is only one container for the veneur global
if len(pod.Spec.Containers) > 0 {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == "http" {
forwardPort = strconv.Itoa(int(port.ContainerPort))
log.WithField("port", forwardPort).Debug("Found http port")
break
}

// TODO don't assume all TCP ports are for importing
if port.Protocol == "TCP" {
forwardPort = strconv.Itoa(int(port.ContainerPort))
log.WithField("port", forwardPort).Debug("Found TCP port")
}
}
}
}

if forwardPort == "" || forwardPort == "0" {
log.WithFields(logrus.Fields{
"podIndex": podIndex,
"PodIP": pod.Status.PodIP,
"forwardPort": forwardPort,
}).Error("Could not find valid port for forwarding")
continue
}

if pod.Status.PodIP == "" {
log.WithFields(logrus.Fields{
"podIndex": podIndex,
"PodIP": pod.Status.PodIP,
"forwardPort": forwardPort,
}).Error("Could not find valid podIP for forwarding")
continue
}

// prepend with // so that it is a valid URL parseable by url.Parse
podIp := fmt.Sprintf("http://%s:%s", pod.Status.PodIP, forwardPort)
ips = append(ips, podIp)
}
return ips, nil
}
83 changes: 83 additions & 0 deletions kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package veneur

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
)

func TestGetDestinationFromHttpPod(t *testing.T) {
httpPod := v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "127.0.0.1",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Ports: []v1.ContainerPort{
v1.ContainerPort{
Name: "http",
ContainerPort: 8080,
Protocol: v1.ProtocolTCP,
},
},
},
},
},
}

podID := getDestinationFromPod(0, httpPod)
assert.Equal(t, "http://127.0.0.1:8080", podID)
}

func TestGetDestinationFromTcpPod(t *testing.T) {
tcpPod := v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "127.0.0.1",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Ports: []v1.ContainerPort{
v1.ContainerPort{
Name: "tcp",
ContainerPort: 8080,
Protocol: v1.ProtocolTCP,
},
},
},
},
},
}

podID := getDestinationFromPod(0, tcpPod)
assert.Equal(t, "http://127.0.0.1:8080", podID)
}

func TestGetDestinationFromGrpcPod(t *testing.T) {
grpcPod := v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "127.0.0.1",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Ports: []v1.ContainerPort{
v1.ContainerPort{
Name: "grpc",
ContainerPort: 8080,
Protocol: v1.ProtocolTCP,
},
},
},
},
},
}

podID := getDestinationFromPod(0, grpcPod)
assert.Equal(t, "127.0.0.1:8080", podID)
}

0 comments on commit cf35312

Please sign in to comment.