Skip to content

Commit

Permalink
support runtime v1 api
Browse files Browse the repository at this point in the history
  • Loading branch information
BSWANG committed Jul 14, 2023
1 parent 978a8d4 commit 6a6adc1
Showing 1 changed file with 81 additions and 19 deletions.
100 changes: 81 additions & 19 deletions pkg/skoop/collector/podcollector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"strings"
"syscall"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/samber/lo"
"golang.org/x/sys/unix"

Expand All @@ -30,16 +33,18 @@ import (
"github.com/vishvananda/netns"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
pb "k8s.io/cri-api/pkg/apis/runtime/v1"
pbv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

type podCollector struct {
runtimeEndpoint string
podNamespace string
podName string

dockerCli client.APIClient
runtimeCli pb.RuntimeServiceClient
dockerCli client.APIClient
runtimeCli pb.RuntimeServiceClient
runtimeCliV1Alpha2 pbv1alpha2.RuntimeServiceClient
}

func (a *podCollector) DumpNodeInfos() (*k8s.NodeNetworkStackDump, error) {
Expand Down Expand Up @@ -98,23 +103,50 @@ func NewCollector(podNamespace, podName, runtimeEndpoint string) (collector.Coll
return nil, err
}
pc.runtimeCli = pb.NewRuntimeServiceClient(conn)
// negotiate cri api version
_, err = pc.runtimeCli.Version(context.TODO(), &pb.VersionRequest{})
if status.Code(err) == codes.Unimplemented {
pc.runtimeCli = nil
pc.runtimeCliV1Alpha2 = pbv1alpha2.NewRuntimeServiceClient(conn)
} else if err != nil {
return nil, err
}

return pc, nil
}

func (a *podCollector) PodInfo(sandbox *pb.PodSandbox) (k8s.PodNetInfo, error) {
p := k8s.PodNetInfo{}
status, err := a.runtimeCli.PodSandboxStatus(context.TODO(), &pb.PodSandboxStatusRequest{
PodSandboxId: sandbox.Id,
Verbose: a.dockerCli == nil,
})
if err != nil {
return p, err
var (
sandboxStatus *pb.PodSandboxStatusResponse
err error
)
if a.runtimeCli != nil {
sandboxStatus, err = a.runtimeCli.PodSandboxStatus(context.TODO(), &pb.PodSandboxStatusRequest{
PodSandboxId: sandbox.Id,
Verbose: a.dockerCli == nil,
})
if err != nil {
return p, err
}
} else {
statusAlpha, err := a.runtimeCliV1Alpha2.PodSandboxStatus(context.TODO(), &pbv1alpha2.PodSandboxStatusRequest{
PodSandboxId: sandbox.Id,
Verbose: a.dockerCli == nil,
})
if err != nil {
return p, err
}
sandboxStatus = &pb.PodSandboxStatusResponse{}
if err = alphaRespTov1Resp(statusAlpha, sandboxStatus); err != nil {
return p, fmt.Errorf("error convert alpha resp %v", err)
}
}
p.PodName = status.Status.GetMetadata().GetName()
p.PodNamespace = status.Status.GetMetadata().GetNamespace()
p.ContainerID = status.Status.GetId()
p.PodUID = status.Status.GetMetadata().Uid

p.PodName = sandboxStatus.Status.GetMetadata().GetName()
p.PodNamespace = sandboxStatus.Status.GetMetadata().GetNamespace()
p.ContainerID = sandboxStatus.Status.GetId()
p.PodUID = sandboxStatus.Status.GetMetadata().Uid

if a.dockerCli != nil {
sandboxInfo, err := a.dockerCli.ContainerInspect(context.TODO(), sandbox.Id)
Expand All @@ -124,14 +156,14 @@ func (a *podCollector) PodInfo(sandbox *pb.PodSandbox) (k8s.PodNetInfo, error) {
p.PID = uint32(sandboxInfo.State.Pid)
} else {
sandboxInfo := server.SandboxInfo{}
err = json.Unmarshal([]byte(status.GetInfo()["info"]), &sandboxInfo)
err = json.Unmarshal([]byte(sandboxStatus.GetInfo()["info"]), &sandboxInfo)
if err != nil {
return p, err
}
p.PID = sandboxInfo.Pid
}

if status.Status.GetLinux().GetNamespaces().GetOptions().GetNetwork() == pb.NamespaceMode_POD {
if sandboxStatus.Status.GetLinux().GetNamespaces().GetOptions().GetNetwork() == pb.NamespaceMode_POD {
p.NetworkMode = "none"
p.Netns = fmt.Sprintf("/proc/%d/ns/net", p.PID)
} else {
Expand All @@ -141,11 +173,40 @@ func (a *podCollector) PodInfo(sandbox *pb.PodSandbox) (k8s.PodNetInfo, error) {
return p, nil
}

func (a *podCollector) PodList() ([]k8s.PodNetInfo, error) {
var pods []k8s.PodNetInfo
sandboxs, err := a.runtimeCli.ListPodSandbox(context.TODO(), &pb.ListPodSandboxRequest{})
func alphaRespTov1Resp(
alphaRes interface{ Marshal() ([]byte, error) },
v1Res interface{ Unmarshal(_ []byte) error },
) error {
p, err := alphaRes.Marshal()
if err != nil {
return nil, fmt.Errorf("error list pod sandbox: %v", err)
return err
}

return v1Res.Unmarshal(p)
}

func (a *podCollector) PodList() ([]k8s.PodNetInfo, error) {
var (
pods []k8s.PodNetInfo
sandboxs *pb.ListPodSandboxResponse
err error
)

if a.runtimeCli != nil {
sandboxs, err = a.runtimeCli.ListPodSandbox(context.TODO(), &pb.ListPodSandboxRequest{})
if err != nil {
return nil, fmt.Errorf("error list pod sandbox: %v", err)
}
} else {
alphaSandboxs, err := a.runtimeCliV1Alpha2.ListPodSandbox(context.TODO(), &pbv1alpha2.ListPodSandboxRequest{})
if err != nil {
return nil, fmt.Errorf("error list pod sandbox: %v", err)
}
sandboxs = &pb.ListPodSandboxResponse{}
err = alphaRespTov1Resp(alphaSandboxs, sandboxs)
if err != nil {
return nil, fmt.Errorf("error convert alpha pod sandbox: %v", err)
}
}

for _, s := range sandboxs.Items {
Expand All @@ -157,6 +218,7 @@ func (a *podCollector) PodList() ([]k8s.PodNetInfo, error) {
pods = append(pods, podinfo)
}
}

return pods, nil
}

Expand Down

0 comments on commit 6a6adc1

Please sign in to comment.