Skip to content

Commit

Permalink
unify node/controller service enabling
Browse files Browse the repository at this point in the history
Every driver supports enable/disable node or controller service, with unified interface.

deprecate SERVICE_TYPE env
deprecate -run-as-controller flags
introduce -run-controller-service and -run-node-service,
allowing controller and node service to be enabled/disabled independently.
  • Loading branch information
huww98 committed Mar 7, 2024
1 parent 1dd6eef commit d8be587
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 116 deletions.
59 changes: 19 additions & 40 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,20 @@ const (
)

var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "node id")
runAsController = flag.Bool("run-as-controller", false, "Only run as controller service")
driver = flag.String("driver", TypePluginDISK, "CSI Driver")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "node id")
runAsController = flag.Bool("run-as-controller", false, "Only run as controller service (deprecated)")
runControllerService = flag.Bool("run-controller-service", true, "activate CSI controller service")
runNodeService = flag.Bool("run-node-service", true, "activate CSI node service")
driver = flag.String("driver", TypePluginDISK, "CSI Driver")
// Deprecated: rootDir is instead by KUBELET_ROOT_DIR env.
rootDir = flag.String("rootdir", "/var/lib/kubelet/csi-plugins", "Kubernetes root directory")
)

type globalMetricConfig struct {
enableMetric bool
serviceType string
}

// Nas CSI Plugin
func main() {
flag.Parse()
serviceType := os.Getenv(utils.ServiceType)

if len(serviceType) == 0 || serviceType == "" {
serviceType = utils.PluginService
}
serviceType := utils.GetServiceType(*runAsController, *runControllerService, *runNodeService)

// When serviceType is neither plugin nor provisioner, the program will exits.
if serviceType != utils.PluginService && serviceType != utils.ProvisionerService {
log.Fatalf("Service type is unknown:%s", serviceType)
}
// enable pprof analyse
pprofPort := os.Getenv("PPROF_PORT")
if pprofPort != "" {
Expand Down Expand Up @@ -181,13 +169,13 @@ func main() {
case TypePluginOSS:
go func(endPoint string) {
defer wg.Done()
driver := oss.NewDriver(*nodeID, endPoint, meta, *runAsController)
driver := oss.NewDriver(*nodeID, endPoint, meta, serviceType)
driver.Run()
}(endPointName)
case TypePluginDISK:
go func(endPoint string) {
defer wg.Done()
driver := disk.NewDriver(meta, endPoint, *runAsController)
driver := disk.NewDriver(meta, endPoint, serviceType)
driver.Run()
}(endPointName)

Expand All @@ -206,7 +194,7 @@ func main() {
case TypePluginENS:
go func(endpoint string) {
defer wg.Done()
driver := ens.NewDriver(*nodeID, endpoint)
driver := ens.NewDriver(*nodeID, endpoint, serviceType)
driver.Run()
}(endPointName)
case ExtenderAgent:
Expand All @@ -218,7 +206,7 @@ func main() {
case TypePluginPOV:
go func(endPoint string) {
defer wg.Done()
driver := pov.NewDriver(*nodeID, endPoint, *runAsController)
driver := pov.NewDriver(*nodeID, endPoint, serviceType)
driver.Run()
}(endPointName)
default:
Expand All @@ -227,34 +215,25 @@ func main() {
}
servicePort := os.Getenv("SERVICE_PORT")

if len(servicePort) == 0 || servicePort == "" {
switch serviceType {
case utils.PluginService:
servicePort = PluginServicePort
case utils.ProvisionerService:
if servicePort == "" {
servicePort = PluginServicePort
if serviceType&utils.Controller != 0 {
servicePort = ProvisionerServicePort
default:
}
}

metricConfig := &globalMetricConfig{
true,
"plugin",
}

enableMetric := os.Getenv("ENABLE_METRIC")
version.SetPrometheusVersion()
if enableMetric == "false" {
metricConfig.enableMetric = false
enableMetric := true
if os.Getenv("ENABLE_METRIC") == "false" {
enableMetric = false
}
metricConfig.serviceType = serviceType

log.Info("CSI is running status.")
csiMux := http.NewServeMux()
csiMux.HandleFunc("/healthz", healthHandler)
log.Infof("Metric listening on address: /healthz")
if metricConfig.enableMetric {
metricHandler := metric.NewMetricHandler(metricConfig.serviceType, driverNames)
if enableMetric && serviceType&utils.Node != 0 {
metricHandler := metric.NewMetricHandler(driverNames)
csiMux.Handle("/metrics", metricHandler)
log.Infof("Metric listening on address: /metrics")
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/disk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func NewControllerServer(d *csicommon.CSIDriver, client *crd.Clientset) csi.Cont
SnapshotRequestInterval = interval
}

serviceType := os.Getenv(utils.ServiceType)
if serviceType == utils.ProvisionerService && installCRD {
if installCRD {
checkInstallCRD(client)
checkInstallDefaultVolumeSnapshotClass(GlobalConfigVar.SnapClient)
}
Expand Down
34 changes: 15 additions & 19 deletions pkg/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type GlobalConfig struct {
FilesystemLosePercent float64
ClusterID string
DiskPartitionEnable bool
ControllerService bool
BdfHealthCheck bool
DiskMultiTenantEnable bool
CheckBDFHotPlugin bool
Expand All @@ -99,14 +98,20 @@ func initDriver() {
}

// NewDriver create the identity/node/controller server and disk driver
func NewDriver(m metadata.MetadataProvider, endpoint string, runAsController bool) *DISK {
func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.ServiceType) *DISK {
initDriver()
tmpdisk := &DISK{}
tmpdisk.endpoint = endpoint

// Config Global vars
cfg := GlobalConfigSet(m)

if serviceType&utils.Node != 0 {
GlobalConfigVar.NodeID = metadata.MustGet(m, metadata.InstanceID)
} else {
GlobalConfigVar.NodeID = "not-retrieved" // make csi-common happy
}

csiDriver := csicommon.NewCSIDriver(driverName, version.VERSION, GlobalConfigVar.NodeID)
tmpdisk.driver = csiDriver
tmpdisk.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
Expand Down Expand Up @@ -135,9 +140,10 @@ func NewDriver(m metadata.MetadataProvider, endpoint string, runAsController boo

// Create GRPC servers
tmpdisk.idServer = NewIdentityServer(tmpdisk.driver)
tmpdisk.controllerServer = NewControllerServer(tmpdisk.driver, apiExtentionClient)

if !runAsController {
if serviceType&utils.Controller != 0 {
tmpdisk.controllerServer = NewControllerServer(tmpdisk.driver, apiExtentionClient)
}
if serviceType&utils.Node != 0 {
tmpdisk.nodeServer = NewNodeServer(tmpdisk.driver, m)
}

Expand Down Expand Up @@ -229,19 +235,9 @@ func GlobalConfigSet(m metadata.MetadataProvider) *restclient.Config {
}
clustID := os.Getenv("CLUSTER_ID")

controllerServerType := false
nodeID := ""
if os.Getenv(utils.ServiceType) == utils.ProvisionerService {
controllerServerType = true
nodeID = "controller" // make csi-common happy
} else {
nodeID = metadata.MustGet(m, metadata.InstanceID)
}

// Global Config Set
GlobalConfigVar = GlobalConfig{
Region: metadata.MustGet(m, metadata.RegionID),
NodeID: nodeID,
ADControllerEnable: csiCfg.GetBool("disk-adcontroller-enable", "DISK_AD_CONTROLLER", false),
DiskTagEnable: csiCfg.GetBool("disk-tag-by-plugin", "DISK_TAGED_BY_PLUGIN", false),
DiskBdfEnable: csiCfg.GetBool("disk-bdf-enable", "DISK_BDF_ENABLE", false),
Expand All @@ -255,7 +251,6 @@ func GlobalConfigSet(m metadata.MetadataProvider) *restclient.Config {
FilesystemLosePercent: fileSystemLosePercent,
ClusterID: clustID,
DiskPartitionEnable: csiCfg.GetBool("disk-partition-enable", "DISK_PARTITION_ENABLE", true),
ControllerService: controllerServerType,
BdfHealthCheck: csiCfg.GetBool("bdf-health-check", "BDF_HEALTH_CHECK", true),
DiskMultiTenantEnable: csiCfg.GetBool("disk-multi-tenant-enable", "DISK_MULTI_TENANT_ENABLE", false),
NodeMultiZoneEnable: csiCfg.GetBool("node-multi-zone-enable", "NODE_MULTI_ZONE_ENABLE", false),
Expand All @@ -282,11 +277,12 @@ func GlobalConfigSet(m metadata.MetadataProvider) *restclient.Config {
GlobalConfigVar.ClusterID,
)

if controllerServerType && !csiCfg.GetBool("disk-serial-attach", "DISK_SERIAL_ATTACH", false) {
// if ADController is not enabled, we need SERIAL_ATTACH to recognize old disk
if !GlobalConfigVar.ADControllerEnable || csiCfg.GetBool("disk-serial-attach", "DISK_SERIAL_ATTACH", false) {
GlobalConfigVar.AttachDetachSlots = NewSerialAttachDetachSlots()
} else {
log.Infof("Disk parallel attach/detach enabled, please set DISK_SERIAL_ATTACH if you see a lot of InvalidOperation.Conflict error.")
GlobalConfigVar.AttachDetachSlots = NewParallelAttachDetachSlots()
} else {
GlobalConfigVar.AttachDetachSlots = NewSerialAttachDetachSlots()
}

return cfg
Expand Down
2 changes: 1 addition & 1 deletion pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func NewNodeServer(d *csicommon.CSIDriver, m metadata.MetadataProvider) csi.Node
go checkVfhpOnlineReconcile()
}

if !GlobalConfigVar.ControllerService && IsVFNode() && GlobalConfigVar.BdfHealthCheck {
if IsVFNode() && GlobalConfigVar.BdfHealthCheck {
go BdfHealthCheck()
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/ens/ens.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ENS struct {

func initDriver() {}

func NewDriver(nodeID, endpoint string) *ENS {
func NewDriver(nodeID, endpoint string, serviceType utils.ServiceType) *ENS {

initDriver()
tmpENS := &ENS{}
Expand All @@ -79,9 +79,10 @@ func NewDriver(nodeID, endpoint string) *ENS {
tmpENS.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})

tmpENS.idServer = NewIdentityServer(tmpENS.driver)
if GlobalConfigVar.ControllerService {
if serviceType&utils.Controller != 0 {
tmpENS.controllerServer = NewControllerServer(tmpENS.driver)
} else {
}
if serviceType&utils.Node != 0 {
tmpENS.nodeServer = NewNodeServer(tmpENS.driver)
}
return tmpENS
Expand Down Expand Up @@ -155,11 +156,6 @@ func NewGlobalConfig() {
detachBeforeAttach = true
}

controllerServerType := false
if os.Getenv(utils.ServiceType) == utils.ProvisionerService {
controllerServerType = true
}

GlobalConfigVar = GlobalConfig{
KClient: kubeClient,
InstanceID: instanceID,
Expand All @@ -168,7 +164,6 @@ func NewGlobalConfig() {
RegionID: regionID,
EnableAttachDetachController: attachDetachController,
DetachBeforeAttach: detachBeforeAttach,
ControllerService: controllerServerType,
}
}

Expand All @@ -177,7 +172,6 @@ type GlobalConfig struct {
InstanceID string
ClusterID string
DetachBeforeAttach bool
ControllerService bool

EnableDiskPartition string
EnableAttachDetachController string
Expand Down
38 changes: 18 additions & 20 deletions pkg/metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,32 @@ type CSICollector struct {
}

// newCSICollector method returns the CSICollector object
func newCSICollector(metricType string, driverNames []string) error {
func newCSICollector(driverNames []string) error {
if csiCollectorInstance != nil {
return nil
}
collectors := make(map[string]Collector)
if metricType == pluginService {
enabledDrivers := map[string]struct{}{}
for _, d := range driverNames {
enabledDrivers[d] = struct{}{}
}
for _, reg := range registry {
enabled := len(reg.RelatedDrivers) == 0
for _, d := range reg.RelatedDrivers {
if _, ok := enabledDrivers[d]; ok {
enabled = true
break
}
enabledDrivers := map[string]struct{}{}
for _, d := range driverNames {
enabledDrivers[d] = struct{}{}
}
for _, reg := range registry {
enabled := len(reg.RelatedDrivers) == 0
for _, d := range reg.RelatedDrivers {
if _, ok := enabledDrivers[d]; ok {
enabled = true
break
}
if enabled {
collector, err := reg.Factory()
if err != nil {
return err
}
collectors[reg.Name] = collector
}
if enabled {
collector, err := reg.Factory()
if err != nil {
return err
}
collectors[reg.Name] = collector
}

}

csiCollectorInstance = &CSICollector{Collectors: collectors}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// NewMetricHandler method returns a promHttp object
func NewMetricHandler(serviceType string, driverNames []string) *Handler {
func NewMetricHandler(driverNames []string) *Handler {
//csi collector singleton
err := newCSICollector(serviceType, driverNames)
err := newCSICollector(driverNames)
if err != nil {
logrus.Errorf("Couldn't create collector: %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/nas/nas.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type NAS struct {
}

// NewDriver create the identity/node/controller server and disk driver
func NewDriver(nodeID, endpoint, serviceType string) *NAS {
func NewDriver(nodeID, endpoint string, serviceType utils.ServiceType) *NAS {
log.Infof("Driver: %v version: %v", driverName, version.VERSION)

d := &NAS{}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (d *NAS) Run() {
}

// GlobalConfigSet set global config
func GlobalConfigSet(serviceType string) *restclient.Config {
func GlobalConfigSet(serviceType utils.ServiceType) *restclient.Config {
// Global Configs Set
cfg, err := clientcmd.BuildConfigFromFlags(options.MasterURL, options.Kubeconfig)
if err != nil {
Expand Down Expand Up @@ -174,7 +174,7 @@ func GlobalConfigSet(serviceType string) *restclient.Config {
//start go write cluster nodeIP to /etc/hosts
//format{["192.168.1.1:8800", "192.168.1.2:8801", "192.168.1.3:8802"]}
//get service endpoint->format json->write /etc/hosts/dadi-endpoint.json
if serviceType == utils.PluginService {
if serviceType&utils.Node != 0 {
go dadi.Run(kubeClient)
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type OSS struct {
}

// NewDriver init oss type of csi driver
func NewDriver(nodeID, endpoint string, m metadata.MetadataProvider, runAsController bool) *OSS {
func NewDriver(nodeID, endpoint string, m metadata.MetadataProvider, serviceType utils.ServiceType) *OSS {
log.Infof("Driver: %v version: %v", driverName, version.VERSION)

d := &OSS{}
Expand All @@ -71,8 +71,10 @@ func NewDriver(nodeID, endpoint string, m metadata.MetadataProvider, runAsContro

d.driver = csiDriver

d.controllerServer = newControllerServer(d.driver)
if !runAsController {
if serviceType&utils.Controller != 0 {
d.controllerServer = newControllerServer(d.driver)
}
if serviceType&utils.Node != 0 {
d.nodeServer = newNodeServer(d.driver, m)
}
return d
Expand Down
Loading

0 comments on commit d8be587

Please sign in to comment.