Skip to content

Commit

Permalink
Support mounting and deleting version 1.0.0 RBD volumes
Browse files Browse the repository at this point in the history
This commit adds support to mount and delete volumes provisioned by older
plugin versions (1.0.0) in order to support backward compatibility to 1.0.0
created volumes.

It adds back the ability to specify where older meta data was specified, using
the metadatastorage option to the plugin. Further, using the provided meta data
to mount and delete the older volumes.

It also supports a variety of ways in which monitor information may have been
specified (in the storage class, or in the secret), to keep the monitor
information current.

Testing done:
- Mount/Delete 1.0.0 plugin created volume with monitors in the StorageClass
- Mount/Delete 1.0.0 plugin created volume with monitors in the secret with
  a key "monitors"
- Mount/Delete 1.0.0 plugin created volume with monitors in the secret with
  a user specified key
- PVC creation and deletion with the current version (to ensure at the minimum
  no broken functionality)
- Tested some negative cases, where monitor information is missing in secrets
  or present with a different key name, to understand if failure scenarios work
  as expected

Updates #378

Follow-up work:
- Documentation on how to upgrade to 1.1 plugin and retain above functionality
  for older volumes

Signed-off-by: ShyamsundarR <[email protected]>
  • Loading branch information
ShyamsundarR authored and mergify[bot] committed Jul 8, 2019
1 parent 09f1266 commit fa68c35
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 52 deletions.
15 changes: 11 additions & 4 deletions cmd/cephcsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ var (
nodeID = flag.String("nodeid", "", "node id")
instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
" instances, when sharing Ceph clusters across CSI instances for provisioning")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")

// rbd related flags
containerized = flag.Bool("containerized", true, "whether run as containerized")

// cephfs related flags
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
)

func init() {
Expand Down Expand Up @@ -109,8 +109,15 @@ func main() {
switch driverType {
case rbdType:
rbd.PluginFolder += dname
if *metadataStorage != "" {
cp, err = util.CreatePersistanceStorage(
rbd.PluginFolder, *metadataStorage, dname)
if err != nil {
os.Exit(1)
}
}
driver := rbd.NewDriver()
driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized)
driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized, cp)

case cephfsType:
cephfs.PluginFolder += dname
Expand Down
17 changes: 9 additions & 8 deletions docs/deploy-rbd.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ make image-cephcsi

**Available command line arguments:**

| Option | Default value | Description |
| ----------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `rbd.csi.ceph.com` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
| `--type` | _empty_ | driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
| `--containerized` | true | Whether running in containerized mode |
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
| Option | Default value | Description |
| ------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `rbd.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
| `--containerized` | true | Whether running in containerized mode |
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |

**Available environmental variables:**

Expand Down
65 changes: 64 additions & 1 deletion pkg/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
// controller server spec.
type ControllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
}

func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) error {
Expand Down Expand Up @@ -83,7 +84,7 @@ func (cs *ControllerServer) parseVolCreateRequest(req *csi.CreateVolumeRequest)
}

// if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks
rbdVol, err := genVolFromVolumeOptions(req.GetParameters(), (isMultiNode && isBlock))
rbdVol, err := genVolFromVolumeOptions(req.GetParameters(), nil, (isMultiNode && isBlock), false)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -226,6 +227,50 @@ func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *
return nil
}

// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin
func (cs *ControllerServer) DeleteLegacyVolume(req *csi.DeleteVolumeRequest, cr *util.Credentials) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()

if cs.MetadataStore == nil {
return nil, status.Errorf(codes.InvalidArgument, "missing metadata store configuration to"+
" proceed with deleting legacy volume ID (%s)", volumeID)
}

idLk := legacyVolumeIDLocker.Lock(volumeID)
defer legacyVolumeIDLocker.Unlock(idLk, volumeID)

rbdVol := &rbdVolume{}
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.V(3).Infof("metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)", volumeID, err)
return &csi.DeleteVolumeResponse{}, nil
}

return nil, status.Error(codes.Internal, err.Error())
}

// Fill up Monitors
if err := updateMons(rbdVol, nil, req.GetSecrets()); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Update rbdImageName as the VolName when dealing with version 1 volumes
rbdVol.RbdImageName = rbdVol.VolName

klog.V(4).Infof("deleting legacy volume %s", rbdVol.VolName)
if err := deleteImage(rbdVol, cr); err != nil {
// TODO: can we detect "already deleted" situations here and proceed?
klog.V(3).Infof("failed to delete legacy rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
return nil, status.Error(codes.Internal, err.Error())
}

if err := cs.MetadataStore.Delete(volumeID); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.DeleteVolumeResponse{}, nil
}

// DeleteVolume deletes the volume in backend and removes the volume metadata
// from store
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
Expand All @@ -247,6 +292,18 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol

rbdVol := &rbdVolume{}
if err := genVolFromVolID(rbdVol, volumeID, cr); err != nil {
// If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt
// to process it as such
if _, ok := err.(ErrInvalidVolID); ok {
if isLegacyVolumeID(volumeID) {
klog.V(2).Infof("attempting deletion of potential legacy volume (%s)", volumeID)
return cs.DeleteLegacyVolume(req, cr)
}

// Consider unknown volumeID as a successfully deleted volume
return &csi.DeleteVolumeResponse{}, nil
}

// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (image and imageOMap are garbage collected already), hence return
// success as deletion is complete
Expand Down Expand Up @@ -284,6 +341,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error())
}

if err := undoVolReservation(rbdVol, cr); err != nil {
klog.Errorf("failed to remove reservation for volume (%s) with backing image (%s) (%s)",
rbdVol.RequestName, rbdVol.RbdImageName, err)
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.DeleteVolumeResponse{}, nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/rbd/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,13 @@ type ErrVolNameConflict struct {
func (e ErrVolNameConflict) Error() string {
return e.err.Error()
}

// ErrInvalidVolID is returned when a CSI passed VolumeID does not conform to any known volume ID
// formats
type ErrInvalidVolID struct {
err error
}

func (e ErrInvalidVolID) Error() string {
return e.err.Error()
}
51 changes: 42 additions & 9 deletions pkg/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

disableInUseChecks := false

isLegacyVolume := false
volName, err := getVolumeName(req)
if err != nil {
// error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name
// pattern match in addition to error to ensure this is a likely v1.0.0 volume
if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(req.GetVolumeId()) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

volName, err = getLegacyVolumeName(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
isLegacyVolume = true
}

isBlock := req.GetVolumeCapability().GetBlock() != nil
// Check if that target path exists properly
notMnt, err := ns.createTargetPath(targetPath, isBlock)
Expand All @@ -88,12 +104,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
}

volOptions, err := genVolFromVolumeOptions(req.GetVolumeContext(), disableInUseChecks)
if err != nil {
return nil, err
}

volName, err := ns.getVolumeName(req)
volOptions, err := genVolFromVolumeOptions(req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume)
if err != nil {
return nil, err
}
Expand All @@ -118,18 +129,40 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *NodeServer) getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
func getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
var vi util.CSIIdentifier

err := vi.DecomposeCSIID(req.GetVolumeId())
if err != nil {
klog.Errorf("error decoding volume ID (%s) (%s)", err, req.GetVolumeId())
return "", status.Error(codes.InvalidArgument, err.Error())
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, req.GetVolumeId())
return "", ErrInvalidVolID{err}
}

return volJournal.NamingPrefix() + vi.ObjectUUID, nil
}

func getLegacyVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
var volName string

isBlock := req.GetVolumeCapability().GetBlock() != nil
targetPath := req.GetTargetPath()

if isBlock {
// Get volName from targetPath
s := strings.Split(targetPath, "/")
volName = s[len(s)-1]
} else {
// Get volName from targetPath
if !strings.HasSuffix(targetPath, "/mount") {
return "", fmt.Errorf("rbd: malformed value of target path: %s", targetPath)
}
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName = s[len(s)-1]
}

return volName, nil
}

func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error {
// Publish Path
fsType := req.GetVolumeCapability().GetMount().GetFsType()
Expand Down
7 changes: 4 additions & 3 deletions pkg/rbd/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
}

// NewControllerServer initialize a controller server for rbd CSI driver
func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister,
}
}

Expand All @@ -98,7 +99,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, err

// Run start a non-blocking grpc controller,node and identityserver for
// rbd CSI driver which can serve multiple parallel requests
func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool) {
func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool, cachePersister util.CachePersister) {
var err error

klog.Infof("Driver: %v version: %v", driverName, version)
Expand Down Expand Up @@ -147,7 +148,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri
klog.Fatalf("failed to start node server, err %v\n", err)
}

r.cs = NewControllerServer(r.cd)
r.cs = NewControllerServer(r.cd, cachePersister)

s := csicommon.NewNonBlockingGRPCServer()
s.Start(endpoint, r.ids, r.cs, r.ns)
Expand Down
Loading

0 comments on commit fa68c35

Please sign in to comment.