Skip to content

Commit

Permalink
unify node/controller service enabling
Browse files Browse the repository at this point in the history
Every driver supports enable/disable node or controller service, with unified interface.

deprecate SERVICE_TYPE env
deprecate -run-as-controller flags
introduce -run-controller-service and -run-node-service,
allowing controller and node service to be enabled/disabled independently.
  • Loading branch information
huww98 committed Mar 7, 2024
1 parent 1dd6eef commit 53fd851
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 126 deletions.
63 changes: 21 additions & 42 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,20 @@ const (
)

var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "node id")
runAsController = flag.Bool("run-as-controller", false, "Only run as controller service")
driver = flag.String("driver", TypePluginDISK, "CSI Driver")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "node id")
runAsController = flag.Bool("run-as-controller", false, "Only run as controller service (deprecated)")
runControllerService = flag.Bool("run-controller-service", true, "activate CSI controller service")
runNodeService = flag.Bool("run-node-service", true, "activate CSI node service")
driver = flag.String("driver", TypePluginDISK, "CSI Driver")
// Deprecated: rootDir is instead by KUBELET_ROOT_DIR env.
rootDir = flag.String("rootdir", "/var/lib/kubelet/csi-plugins", "Kubernetes root directory")
)

type globalMetricConfig struct {
enableMetric bool
serviceType string
}

// Nas CSI Plugin
func main() {
flag.Parse()
serviceType := os.Getenv(utils.ServiceType)

if len(serviceType) == 0 || serviceType == "" {
serviceType = utils.PluginService
}
serviceType := utils.GetServiceType(*runAsController, *runControllerService, *runNodeService)

// When serviceType is neither plugin nor provisioner, the program will exits.
if serviceType != utils.PluginService && serviceType != utils.ProvisionerService {
log.Fatalf("Service type is unknown:%s", serviceType)
}
// enable pprof analyse
pprofPort := os.Getenv("PPROF_PORT")
if pprofPort != "" {
Expand Down Expand Up @@ -181,32 +169,32 @@ func main() {
case TypePluginOSS:
go func(endPoint string) {
defer wg.Done()
driver := oss.NewDriver(*nodeID, endPoint, meta, *runAsController)
driver := oss.NewDriver(*nodeID, endPoint, meta, serviceType)
driver.Run()
}(endPointName)
case TypePluginDISK:
go func(endPoint string) {
defer wg.Done()
driver := disk.NewDriver(meta, endPoint, *runAsController)
driver := disk.NewDriver(meta, endPoint, serviceType)
driver.Run()
}(endPointName)

case TypePluginCPFS:
go func(endPoint string) {
defer wg.Done()
driver := cpfs.NewDriver(*nodeID, endPoint)
driver := cpfs.NewDriver(*nodeID, endPoint, serviceType)
driver.Run()
}(endPointName)
case TypePluginDBFS:
go func(endPoint string) {
defer wg.Done()
driver := dbfs.NewDriver(*nodeID, endPoint)
driver := dbfs.NewDriver(*nodeID, endPoint, serviceType)
driver.Run()
}(endPointName)
case TypePluginENS:
go func(endpoint string) {
defer wg.Done()
driver := ens.NewDriver(*nodeID, endpoint)
driver := ens.NewDriver(*nodeID, endpoint, serviceType)
driver.Run()
}(endPointName)
case ExtenderAgent:
Expand All @@ -218,7 +206,7 @@ func main() {
case TypePluginPOV:
go func(endPoint string) {
defer wg.Done()
driver := pov.NewDriver(*nodeID, endPoint, *runAsController)
driver := pov.NewDriver(*nodeID, endPoint, serviceType)
driver.Run()
}(endPointName)
default:
Expand All @@ -227,34 +215,25 @@ func main() {
}
servicePort := os.Getenv("SERVICE_PORT")

if len(servicePort) == 0 || servicePort == "" {
switch serviceType {
case utils.PluginService:
servicePort = PluginServicePort
case utils.ProvisionerService:
if servicePort == "" {
servicePort = PluginServicePort
if serviceType&utils.Controller != 0 {
servicePort = ProvisionerServicePort
default:
}
}

metricConfig := &globalMetricConfig{
true,
"plugin",
}

enableMetric := os.Getenv("ENABLE_METRIC")
version.SetPrometheusVersion()
if enableMetric == "false" {
metricConfig.enableMetric = false
enableMetric := true
if os.Getenv("ENABLE_METRIC") == "false" {
enableMetric = false
}
metricConfig.serviceType = serviceType

log.Info("CSI is running status.")
csiMux := http.NewServeMux()
csiMux.HandleFunc("/healthz", healthHandler)
log.Infof("Metric listening on address: /healthz")
if metricConfig.enableMetric {
metricHandler := metric.NewMetricHandler(metricConfig.serviceType, driverNames)
if enableMetric && serviceType&utils.Node != 0 {
metricHandler := metric.NewMetricHandler(driverNames)
csiMux.Handle("/metrics", metricHandler)
log.Infof("Metric listening on address: /metrics")
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/cpfs/cpfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type CPFS struct {
}

// NewDriver create a cpfs driver object
func NewDriver(nodeID, endpoint string) *CPFS {
func NewDriver(nodeID, endpoint string, serviceType utils.ServiceType) *CPFS {
log.Infof("Driver: %v version: %v", driverName, version.VERSION)

d := &CPFS{}
Expand All @@ -58,7 +58,12 @@ func NewDriver(nodeID, endpoint string) *CPFS {
})

d.driver = csiDriver
d.controllerServer = NewControllerServer(d.driver)
if serviceType&utils.Controller != 0 {
d.controllerServer = NewControllerServer(d.driver)
}
if serviceType&utils.Node != 0 {
d.nodeServer = newNodeServer(d)
}

return d
}
Expand All @@ -75,5 +80,5 @@ func newNodeServer(d *CPFS) *nodeServer {

// Run start a new NodeServer
func (d *CPFS) Run() {
common.RunCSIServer(d.endpoint, csicommon.NewDefaultIdentityServer(d.driver), d.controllerServer, newNodeServer(d))
common.RunCSIServer(d.endpoint, csicommon.NewDefaultIdentityServer(d.driver), d.controllerServer, d.nodeServer)
}
11 changes: 8 additions & 3 deletions pkg/dbfs/dbfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type DBFS struct {
}

// NewDriver create the identity/node/controller server and dbfs driver
func NewDriver(nodeID, endpoint string) *DBFS {
func NewDriver(nodeID, endpoint string, serviceType utils.ServiceType) *DBFS {
log.Infof("Driver: %v version: %v", driverName, version.VERSION)

d := &DBFS{}
Expand All @@ -81,7 +81,12 @@ func NewDriver(nodeID, endpoint string) *DBFS {
if region == "" {
region, _ = utils.GetMetaData(RegionTag)
}
d.controllerServer = NewControllerServer(d.driver, c, region)
if serviceType&utils.Controller != 0 {
d.controllerServer = NewControllerServer(d.driver, c, region)
}
if serviceType&utils.Node != 0 {
d.nodeServer = newNodeServer(d)
}
GlobalConfigVar.DbfsClient = c

// Global Configs Set
Expand All @@ -91,7 +96,7 @@ func NewDriver(nodeID, endpoint string) *DBFS {

// Run start a new NodeServer
func (d *DBFS) Run() {
common.RunCSIServer(d.endpoint, NewIdentityServer(d.driver), d.controllerServer, newNodeServer(d))
common.RunCSIServer(d.endpoint, NewIdentityServer(d.driver), d.controllerServer, d.nodeServer)
}

// GlobalConfigSet set global config
Expand Down
3 changes: 1 addition & 2 deletions pkg/disk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func NewControllerServer(d *csicommon.CSIDriver, client *crd.Clientset) csi.Cont
SnapshotRequestInterval = interval
}

serviceType := os.Getenv(utils.ServiceType)
if serviceType == utils.ProvisionerService && installCRD {
if installCRD {
checkInstallCRD(client)
checkInstallDefaultVolumeSnapshotClass(GlobalConfigVar.SnapClient)
}
Expand Down
34 changes: 15 additions & 19 deletions pkg/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type GlobalConfig struct {
FilesystemLosePercent float64
ClusterID string
DiskPartitionEnable bool
ControllerService bool
BdfHealthCheck bool
DiskMultiTenantEnable bool
CheckBDFHotPlugin bool
Expand All @@ -99,14 +98,20 @@ func initDriver() {
}

// NewDriver create the identity/node/controller server and disk driver
func NewDriver(m metadata.MetadataProvider, endpoint string, runAsController bool) *DISK {
func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.ServiceType) *DISK {
initDriver()
tmpdisk := &DISK{}
tmpdisk.endpoint = endpoint

// Config Global vars
cfg := GlobalConfigSet(m)

if serviceType&utils.Node != 0 {
GlobalConfigVar.NodeID = metadata.MustGet(m, metadata.InstanceID)
} else {
GlobalConfigVar.NodeID = "not-retrieved" // make csi-common happy
}

csiDriver := csicommon.NewCSIDriver(driverName, version.VERSION, GlobalConfigVar.NodeID)
tmpdisk.driver = csiDriver
tmpdisk.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
Expand Down Expand Up @@ -135,9 +140,10 @@ func NewDriver(m metadata.MetadataProvider, endpoint string, runAsController boo

// Create GRPC servers
tmpdisk.idServer = NewIdentityServer(tmpdisk.driver)
tmpdisk.controllerServer = NewControllerServer(tmpdisk.driver, apiExtentionClient)

if !runAsController {
if serviceType&utils.Controller != 0 {
tmpdisk.controllerServer = NewControllerServer(tmpdisk.driver, apiExtentionClient)
}
if serviceType&utils.Node != 0 {
tmpdisk.nodeServer = NewNodeServer(tmpdisk.driver, m)
}

Expand Down Expand Up @@ -229,19 +235,9 @@ func GlobalConfigSet(m metadata.MetadataProvider) *restclient.Config {
}
clustID := os.Getenv("CLUSTER_ID")

controllerServerType := false
nodeID := ""
if os.Getenv(utils.ServiceType) == utils.ProvisionerService {
controllerServerType = true
nodeID = "controller" // make csi-common happy
} else {
nodeID = metadata.MustGet(m, metadata.InstanceID)
}

// Global Config Set
GlobalConfigVar = GlobalConfig{
Region: metadata.MustGet(m, metadata.RegionID),
NodeID: nodeID,
ADControllerEnable: csiCfg.GetBool("disk-adcontroller-enable", "DISK_AD_CONTROLLER", false),
DiskTagEnable: csiCfg.GetBool("disk-tag-by-plugin", "DISK_TAGED_BY_PLUGIN", false),
DiskBdfEnable: csiCfg.GetBool("disk-bdf-enable", "DISK_BDF_ENABLE", false),
Expand All @@ -255,7 +251,6 @@ func GlobalConfigSet(m metadata.MetadataProvider) *restclient.Config {
FilesystemLosePercent: fileSystemLosePercent,
ClusterID: clustID,
DiskPartitionEnable: csiCfg.GetBool("disk-partition-enable", "DISK_PARTITION_ENABLE", true),
ControllerService: controllerServerType,
BdfHealthCheck: csiCfg.GetBool("bdf-health-check", "BDF_HEALTH_CHECK", true),
DiskMultiTenantEnable: csiCfg.GetBool("disk-multi-tenant-enable", "DISK_MULTI_TENANT_ENABLE", false),
NodeMultiZoneEnable: csiCfg.GetBool("node-multi-zone-enable", "NODE_MULTI_ZONE_ENABLE", false),
Expand All @@ -282,11 +277,12 @@ func GlobalConfigSet(m metadata.MetadataProvider) *restclient.Config {
GlobalConfigVar.ClusterID,
)

if controllerServerType && !csiCfg.GetBool("disk-serial-attach", "DISK_SERIAL_ATTACH", false) {
// if ADController is not enabled, we need SERIAL_ATTACH to recognize old disk
if !GlobalConfigVar.ADControllerEnable || csiCfg.GetBool("disk-serial-attach", "DISK_SERIAL_ATTACH", false) {
GlobalConfigVar.AttachDetachSlots = NewSerialAttachDetachSlots()
} else {
log.Infof("Disk parallel attach/detach enabled, please set DISK_SERIAL_ATTACH if you see a lot of InvalidOperation.Conflict error.")
GlobalConfigVar.AttachDetachSlots = NewParallelAttachDetachSlots()
} else {
GlobalConfigVar.AttachDetachSlots = NewSerialAttachDetachSlots()
}

return cfg
Expand Down
2 changes: 1 addition & 1 deletion pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func NewNodeServer(d *csicommon.CSIDriver, m metadata.MetadataProvider) csi.Node
go checkVfhpOnlineReconcile()
}

if !GlobalConfigVar.ControllerService && IsVFNode() && GlobalConfigVar.BdfHealthCheck {
if IsVFNode() && GlobalConfigVar.BdfHealthCheck {
go BdfHealthCheck()
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/ens/ens.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ENS struct {

func initDriver() {}

func NewDriver(nodeID, endpoint string) *ENS {
func NewDriver(nodeID, endpoint string, serviceType utils.ServiceType) *ENS {

initDriver()
tmpENS := &ENS{}
Expand All @@ -79,9 +79,10 @@ func NewDriver(nodeID, endpoint string) *ENS {
tmpENS.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})

tmpENS.idServer = NewIdentityServer(tmpENS.driver)
if GlobalConfigVar.ControllerService {
if serviceType&utils.Controller != 0 {
tmpENS.controllerServer = NewControllerServer(tmpENS.driver)
} else {
}
if serviceType&utils.Node != 0 {
tmpENS.nodeServer = NewNodeServer(tmpENS.driver)
}
return tmpENS
Expand Down Expand Up @@ -155,11 +156,6 @@ func NewGlobalConfig() {
detachBeforeAttach = true
}

controllerServerType := false
if os.Getenv(utils.ServiceType) == utils.ProvisionerService {
controllerServerType = true
}

GlobalConfigVar = GlobalConfig{
KClient: kubeClient,
InstanceID: instanceID,
Expand All @@ -168,7 +164,6 @@ func NewGlobalConfig() {
RegionID: regionID,
EnableAttachDetachController: attachDetachController,
DetachBeforeAttach: detachBeforeAttach,
ControllerService: controllerServerType,
}
}

Expand All @@ -177,7 +172,6 @@ type GlobalConfig struct {
InstanceID string
ClusterID string
DetachBeforeAttach bool
ControllerService bool

EnableDiskPartition string
EnableAttachDetachController string
Expand Down
Loading

0 comments on commit 53fd851

Please sign in to comment.