Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi-agent/disk: node expand volume #1183

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 74 additions & 31 deletions cmd/csi-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,72 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"os"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/oss"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

var mountProxySocket string

type CSIAgent interface {
NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
type FileGrpcServer struct {
input io.Reader
output io.Writer
desc *grpc.ServiceDesc
impl any
}

func NewStdIOGrpcServer() *FileGrpcServer {
return &FileGrpcServer{
input: os.Stdin,
output: os.Stdout,
}
}

func (s *FileGrpcServer) RegisterService(desc *grpc.ServiceDesc, impl interface{}) {
s.desc = desc
s.impl = impl
}

func (s *FileGrpcServer) decode(msg any) error {
pbmsg := msg.(proto.Message)
in, err := io.ReadAll(os.Stdin)
if err != nil {
return err
}
return protojson.Unmarshal(in, pbmsg)
}

func (s *FileGrpcServer) ServeOneRequest(method string) error {
for _, desc := range s.desc.Methods {
if desc.MethodName == method {
resp, err := desc.Handler(s.impl, context.Background(), s.decode, nil)
if err != nil {
return err
}
out, err := protojson.Marshal(resp.(proto.Message))
if err != nil {
return fmt.Errorf("failed to marshal response: %v", err)
}
_, err = s.output.Write(out)
if err != nil {
return fmt.Errorf("failed to write response: %v", err)
}
return nil
}
}
return fmt.Errorf("unknown method: %q", method)
}

func main() {
Expand All @@ -32,51 +83,43 @@ func main() {
return
}

var agent CSIAgent
var agent csi.NodeServer
switch driver := os.Getenv("CSI_DRIVER"); driver {
case "test":
agent = &fakeAgent{}
case "ossplugin.csi.alibabacloud.com":
meta := metadata.NewMetadata()
agent = oss.NewCSIAgent(meta, mountProxySocket)
case "diskplugin.csi.alibabacloud.com":
agent = disk.NewCSIAgent()
default:
printError(fmt.Errorf("invalid CSI_DRIVER: %q", driver))
os.Exit(1)
}

var resp proto.Message

switch cmd := os.Getenv("CSI_COMMAND"); cmd {
case "NodePublishVolume":
var req csi.NodePublishVolumeRequest
err := jsonpb.Unmarshal(os.Stdin, &req)
if err != nil {
printError(err)
os.Exit(1)
}
resp, err = agent.NodePublishVolume(context.Background(), &req)
if err != nil {
printError(err)
os.Exit(2)
}
default:
printError(fmt.Errorf("invalid CSI_COMMAND: %q", cmd))
server := NewStdIOGrpcServer()
csi.RegisterNodeServer(server, common.WrapNodeServerWithValidator(agent))
err := server.ServeOneRequest(os.Getenv("CSI_COMMAND"))
if err != nil {
printError(err)
os.Exit(1)
}

_ = new(jsonpb.Marshaler).Marshal(os.Stdout, resp)
}

func printError(err error) {
_ = json.NewEncoder(os.Stdout).Encode(struct {
Error string `json:"error"`
s, _ := status.FromError(err)
encodeErr := json.NewEncoder(os.Stdout).Encode(struct {
Code codes.Code `json:"code"`
Error string `json:"error"`
}{
Error: err.Error(),
Code: s.Code(),
Error: s.Message(),
})
if encodeErr != nil {
fmt.Fprintf(os.Stderr, "Error: %v, failed to write error to stdout: %v", err, encodeErr)
}
}

type fakeAgent struct{}

func (fakeagent *fakeAgent) NodePublishVolume(context.Context, *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
return &csi.NodePublishVolumeResponse{}, nil
type fakeAgent struct {
csi.UnimplementedNodeServer
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/alibabacloud-go/tea v1.2.1
github.com/aliyun/alibaba-cloud-sdk-go v1.62.719
github.com/aliyun/credentials-go v1.3.1
github.com/container-storage-interface/spec v1.9.0
github.com/container-storage-interface/spec v1.10.0
github.com/containerd/ttrpc v1.2.3
github.com/emirpasic/gods v1.12.0
github.com/go-logr/logr v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/container-storage-interface/spec v1.1.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.9.0 h1:zKtX4STsq31Knz3gciCYCi1SXtO2HJDecIjDVboYavY=
github.com/container-storage-interface/spec v1.9.0/go.mod h1:ZfDu+3ZRyeVqxZM0Ds19MVLkN2d1XJ5MAfi1L3VjlT0=
github.com/container-storage-interface/spec v1.10.0 h1:YkzWPV39x+ZMTa6Ax2czJLLwpryrQ+dPesB34mrRMXA=
github.com/container-storage-interface/spec v1.10.0/go.mod h1:DtUvaQszPml1YJfIK7c00mlv6/g4wNMLanLgiUbKFRI=
github.com/containerd/ttrpc v1.2.3 h1:4jlhbXIGvijRtNC8F/5CpuJZ7yKOBFGFOOXg1bkISz0=
github.com/containerd/ttrpc v1.2.3/go.mod h1:ieWsXucbb8Mj9PH0rXCw1i8IunRbbAiDkpXkbfflWBM=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
41 changes: 1 addition & 40 deletions pkg/common/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,46 +94,7 @@ func (cs *ControllerServerWithValidator) ControllerExpandVolume(context context.
}

type GenericControllerServer struct {
}

func (GenericControllerServer) CreateVolume(context.Context, *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) DeleteVolume(context.Context, *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ControllerPublishVolume(context.Context, *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ControllerUnpublishVolume(context.Context, *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ValidateVolumeCapabilities(context.Context, *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ListVolumes(context.Context, *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) GetCapacity(context.Context, *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) CreateSnapshot(context.Context, *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) DeleteSnapshot(context.Context, *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ListSnapshots(context.Context, *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ControllerExpandVolume(context.Context, *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (GenericControllerServer) ControllerModifyVolume(context.Context, *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
csi.UnimplementedControllerServer
}

func ControllerRPCCapabilities(capsRPC ...csi.ControllerServiceCapability_RPC_Type) []*csi.ControllerServiceCapability {
Expand Down
1 change: 1 addition & 0 deletions pkg/common/identityserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

type GenericIdentityServer struct {
csi.UnimplementedIdentityServer
Name string
}

Expand Down
13 changes: 1 addition & 12 deletions pkg/common/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func filepathContains(basePath, path string) (bool, error) {
}

type GenericNodeServer struct {
csi.UnimplementedNodeServer
NodeID string
}

Expand All @@ -124,15 +125,3 @@ func (*GenericNodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeG
}
return resp, err
}

func (*GenericNodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (*GenericNodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (*GenericNodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
19 changes: 19 additions & 0 deletions pkg/disk/csi_agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package disk

import (
"context"

"github.com/container-storage-interface/spec/lib/go/csi"
)

type CSIAgent struct {
csi.UnimplementedNodeServer
}

func NewCSIAgent() *CSIAgent {
return &CSIAgent{}
}

func (a *CSIAgent) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return localExpandVolume(ctx, req)
}
26 changes: 14 additions & 12 deletions pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,8 +918,6 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
*csi.NodeExpandVolumeResponse, error) {
klog.Infof("NodeExpandVolume: node expand volume: %v", req)

requestBytes := req.GetCapacityRange().GetRequiredBytes()

volumePath := req.GetVolumePath()
diskID := req.GetVolumeId()
if strings.Contains(volumePath, BLOCKVOLUMEPREFIX) {
Expand Down Expand Up @@ -953,6 +951,19 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
deleteUntagAutoSnapshot(volumeExpandAutoSnapshotID, diskID)
}
}()
resp, err := localExpandVolume(ctx, req)
if err != nil && snapshotEnable {
klog.Warningf("NodeExpandVolume:: Please use the snapshot %s for data recovery. The retentionDays is %d", volumeExpandAutoSnapshotID, veasp.RetentionDays)
snapshotEnable = false
}
return resp, err
}

func localExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
requestBytes := req.GetCapacityRange().GetRequiredBytes()
volumePath := req.GetVolumePath()
diskID := req.GetVolumeId()

devicePath, err := GetVolumeDeviceName(diskID)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -984,23 +995,14 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}

// use resizer to expand volume filesystem
mounter := &k8smount.SafeFormatAndMount{Interface: ns.k8smounter, Exec: utilexec.New()}
r := k8smount.NewResizeFs(mounter.Exec)
r := k8smount.NewResizeFs(utilexec.New())
ok, err := r.Resize(devicePath, volumePath)
if err != nil {
klog.Errorf("NodeExpandVolume:: Resize Error, volumeId: %s, devicePath: %s, volumePath: %s, err: %s", diskID, devicePath, volumePath, err.Error())
if snapshotEnable {
klog.Warningf("NodeExpandVolume:: Please use the snapshot %s for data recovery. The retentionDays is %d", volumeExpandAutoSnapshotID, veasp.RetentionDays)
snapshotEnable = false
}
return nil, status.Error(codes.Internal, err.Error())
}
if !ok {
klog.Errorf("NodeExpandVolume:: Resize failed, volumeId: %s, devicePath: %s, volumePath: %s", diskID, devicePath, volumePath)
if snapshotEnable {
klog.Warningf("NodeExpandVolume:: Please use the snapshot %s for data recovery. The retentionDays is %d", volumeExpandAutoSnapshotID, veasp.RetentionDays)
snapshotEnable = false
}
return nil, status.Error(codes.Internal, "Fail to resize volume fs")
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/oss/csi_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
mountutils "k8s.io/mount-utils"
)

type CSIAgent struct {
csi.UnimplementedNodeServer
// mount-proxy socket path
socketPath string
ns *nodeServer
Expand All @@ -31,9 +30,6 @@ func NewCSIAgent(m metadata.MetadataProvider, socketPath string) *CSIAgent {
}

func (a *CSIAgent) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if req.TargetPath == "" {
return nil, status.Errorf(codes.InvalidArgument, "TargetPath is required")
}
if req.PublishContext == nil {
req.PublishContext = map[string]string{}
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/pov/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ const (
// volumeCaps represents how the volume could be accessed.
// It is SINGLE_NODE_WRITER since EBS volume could only be
// attached to a single node at any given time.
var volumeCaps = []csi.VolumeCapability_AccessMode{
{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
var volumeCaps = []csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
}

type controllerService struct {
Expand All @@ -69,7 +67,7 @@ func newControllerService() controllerService {
}

func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.Infof("CreateVolume: called args: %+v", *req)
klog.Infof("CreateVolume: called args: %+v", req)
if err := validateCreateVolumeRequest(req); err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,7 +198,7 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
hasSupport := func(cap *csi.VolumeCapability) bool {
for _, c := range volumeCaps {
if c.GetMode() == cap.AccessMode.GetMode() {
if c == cap.AccessMode.GetMode() {
return true
}
}
Expand Down
Loading