diff --git a/config/daemonconfig/daemonconfig.go b/config/daemonconfig/daemonconfig.go index f515185a32..b096d3c7db 100644 --- a/config/daemonconfig/daemonconfig.go +++ b/config/daemonconfig/daemonconfig.go @@ -9,9 +9,10 @@ package daemonconfig import ( "encoding/json" - "os" + //"os" "reflect" "strings" + "sync" "github.com/pkg/errors" @@ -36,7 +37,7 @@ type DaemonConfig interface { StorageBackend() (StorageBackendType, *BackendConfig) UpdateMirrors(mirrorsConfigDir, registryHost string) error DumpString() (string, error) - DumpFile(path string) error + //DumpFile(path string) error } // Daemon configurations factory @@ -122,20 +123,30 @@ type DeviceConfig struct { } `json:"cache"` } +var configRWMutex sync.RWMutex + +type SupplementInfoInterface interface { + GetImageID() string + GetSnapshotID() string + IsVPCRegistry() bool + GetLabels() map[string]string + GetParams() map[string]string +} + // For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file // We don't have to persist configuration file for fscache since its configuration // is passed through HTTP API. -func DumpConfigFile(c interface{}, path string) error { - if config.IsBackendSourceEnabled() { - c = serializeWithSecretFilter(c) - } - b, err := json.Marshal(c) - if err != nil { - return errors.Wrapf(err, "marshal config") - } - - return os.WriteFile(path, b, 0600) -} +// func DumpConfigFile(c interface{}, path string) error { +// if config.IsBackendSourceEnabled() { +// c = serializeWithSecretFilter(c) +// } +// b, err := json.Marshal(c) +// if err != nil { +// return errors.Wrapf(err, "marshal config") +// } + +// return os.WriteFile(path, b, 0600) +// } func DumpConfigString(c interface{}) (string, error) { b, err := json.Marshal(c) @@ -143,12 +154,14 @@ func DumpConfigString(c interface{}) (string, error) { } // Achieve a daemon configuration from template or snapshotter's configuration -func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string, - vpcRegistry bool, labels map[string]string, params map[string]string) error { +func SupplementDaemonConfig(c DaemonConfig, info SupplementInfoInterface) error { + + configRWMutex.Lock() + defer configRWMutex.Unlock() - image, err := registry.ParseImage(imageID) + image, err := registry.ParseImage(info.GetImageID()) if err != nil { - return errors.Wrapf(err, "parse image %s", imageID) + return errors.Wrapf(err, "parse image %s", info.GetImageID()) } backendType, _ := c.StorageBackend() @@ -156,7 +169,7 @@ func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string, switch backendType { case backendTypeRegistry: registryHost := image.Host - if vpcRegistry { + if info.IsVPCRegistry() { registryHost = registry.ConvertToVPCHost(registryHost) } else if registryHost == "docker.io" { // For docker.io images, we should use index.docker.io @@ -170,8 +183,8 @@ func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string, // If no auth is provided, don't touch auth from provided nydusd configuration file. // We don't validate the original nydusd auth from configuration file since it can be empty // when repository is public. - keyChain := auth.GetRegistryKeyChain(registryHost, imageID, labels) - c.Supplement(registryHost, image.Repo, snapshotID, params) + keyChain := auth.GetRegistryKeyChain(registryHost, info.GetImageID(), info.GetLabels()) + c.Supplement(registryHost, image.Repo, info.GetSnapshotID(), info.GetParams()) c.FillAuth(keyChain) // Localfs and OSS backends don't need any update, diff --git a/config/daemonconfig/fscache.go b/config/daemonconfig/fscache.go index 7c9911cb62..dd2ccc9803 100644 --- a/config/daemonconfig/fscache.go +++ b/config/daemonconfig/fscache.go @@ -9,7 +9,8 @@ package daemonconfig import ( "encoding/json" "os" - "path" + + //"path" "github.com/containerd/log" "github.com/containerd/nydus-snapshotter/pkg/auth" @@ -122,10 +123,10 @@ func (c *FscacheDaemonConfig) DumpString() (string, error) { return DumpConfigString(c) } -func (c *FscacheDaemonConfig) DumpFile(f string) error { - if err := os.MkdirAll(path.Dir(f), 0755); err != nil { - return err - } +// func (c *FscacheDaemonConfig) DumpFile(f string) error { +// if err := os.MkdirAll(path.Dir(f), 0755); err != nil { +// return err +// } - return DumpConfigFile(c, f) -} +// return DumpConfigFile(c, f) +// } diff --git a/config/daemonconfig/fuse.go b/config/daemonconfig/fuse.go index 424c2e1c7c..c392bb80bb 100644 --- a/config/daemonconfig/fuse.go +++ b/config/daemonconfig/fuse.go @@ -9,7 +9,8 @@ package daemonconfig import ( "encoding/json" "os" - "path" + + //"path" "github.com/pkg/errors" @@ -92,12 +93,14 @@ func (c *FuseDaemonConfig) StorageBackend() (string, *BackendConfig) { } func (c *FuseDaemonConfig) DumpString() (string, error) { + configRWMutex.Lock() + defer configRWMutex.Unlock() return DumpConfigString(c) } -func (c *FuseDaemonConfig) DumpFile(f string) error { - if err := os.MkdirAll(path.Dir(f), 0755); err != nil { - return err - } - return DumpConfigFile(c, f) -} +// func (c *FuseDaemonConfig) DumpFile(f string) error { +// if err := os.MkdirAll(path.Dir(f), 0755); err != nil { +// return err +// } +// return DumpConfigFile(c, f) +// } diff --git a/misc/snapshotter/Dockerfile b/misc/snapshotter/Dockerfile index d3cef30762..66a2e0da98 100644 --- a/misc/snapshotter/Dockerfile +++ b/misc/snapshotter/Dockerfile @@ -17,7 +17,7 @@ FROM base AS kubectl-sourcer ARG TARGETARCH RUN apk add -q --no-cache curl && \ - curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \ + curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/v1.30.0/bin/linux/"$TARGETARCH"/kubectl && \ chmod +x /usr/bin/kubectl FROM base diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 2c3a68603b..fe3fb8efb3 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -87,6 +87,21 @@ type Daemon struct { state types.DaemonState } +type NydusdSupplementInfo struct { + DaemonState ConfigState + ImageID string + SnapshotID string + Vpc bool + Labels map[string]string + Params map[string]string +} + +func (s *NydusdSupplementInfo) GetImageID() string { return s.ImageID } +func (s *NydusdSupplementInfo) GetSnapshotID() string { return s.SnapshotID } +func (s *NydusdSupplementInfo) IsVPCRegistry() bool { return s.Vpc } +func (s *NydusdSupplementInfo) GetLabels() map[string]string { return s.Labels } +func (s *NydusdSupplementInfo) GetParams() map[string]string { return s.Params } + func (d *Daemon) Lock() { d.mu.Lock() } @@ -250,12 +265,7 @@ func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error { return err } - c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID)) - if err != nil { - return errors.Wrapf(err, "Failed to reload instance configuration %s", - d.ConfigFile(rafs.SnapshotID)) - } - + c := d.Config cfg, err := c.DumpString() if err != nil { return errors.Wrap(err, "dump instance configuration") @@ -280,12 +290,7 @@ func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error { return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir()) } - c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID)) - if err != nil { - log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err) - return err - } - + c := d.Config cfgStr, err := c.DumpString() if err != nil { return err @@ -650,3 +655,29 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) { return d, nil } + +func (d *Daemon) MountByAPI() error { + rafs := d.RafsCache.Head() + if rafs == nil { + return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID()) + } + client, err := d.GetClient() + if err != nil { + return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID) + } + bootstrap, err := rafs.BootstrapFile() + if err != nil { + return err + } + c := d.Config + cfg, err := c.DumpString() + if err != nil { + return errors.Wrap(err, "dump instance configuration") + } + err = client.Mount("/", bootstrap, cfg) + if err != nil { + return errors.Wrapf(err, "mount rafs instance MountByAPI()") + } + return nil + +} diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index 88b57f6e1b..b6036d15e2 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -130,6 +130,19 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { if err != nil { return errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID) } + + supplementInfo, err := fsManager.GetInfo(d.ID()) + if err != nil { + return errors.Wrap(err, "GetInfo failed") + } + + cfg := d.Config + err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo) + if err != nil { + return errors.Wrap(err, "supplement configuration") + } + d.Config = cfg + if err := fsManager.StartDaemon(d); err != nil { return errors.Wrapf(err, "start daemon %s", d.ID()) } @@ -232,7 +245,6 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s // Instance already exists, how could this happen? Can containerd handle this case? return nil } - fsDriver := config.GetFsDriver() if label.IsTarfsDataLayer(labels) { fsDriver = config.FsDriverBlockdev @@ -302,34 +314,25 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s daemonconfig.WorkDir: workDir, daemonconfig.CacheDir: cacheDir, } + supplementInfo := &daemon.NydusdSupplementInfo{ + DaemonState: d.States, + ImageID: imageID, + SnapshotID: snapshotID, + Vpc: false, + Labels: labels, + Params: params, + } cfg := deepcopy.Copy(*fsManager.DaemonConfig).(daemonconfig.DaemonConfig) - err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params) + err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo) if err != nil { return errors.Wrap(err, "supplement configuration") } - + if errs := fsManager.AddSupplementInfo(supplementInfo); errs != nil { + return errors.Wrapf(err, "AddSupplementInfo failed %s", d.States.ID) + } // TODO: How to manage rafs configurations on-disk? separated json config file or DB record? // In order to recover erofs mount, the configuration file has to be persisted. - var configSubDir string - if useSharedDaemon { - configSubDir = snapshotID - } else { - // Associate daemon config object when creating a new daemon object to avoid - // reading disk file again and again. - // For shared daemon, each rafs instance has its own configuration, so we don't - // attach a config interface to daemon in this case. - d.Config = cfg - } - - err = cfg.DumpFile(d.ConfigFile(configSubDir)) - if err != nil { - if errors.Is(err, errdefs.ErrAlreadyExists) { - log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir)) - } else { - return errors.Wrap(err, "dump daemon configuration file") - } - } - + d.Config = cfg d.AddRafsInstance(rafs) // if publicKey is not empty we should verify bootstrap file of image @@ -596,10 +599,6 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) { // it is loaded when requesting mount api // Dump the configuration file since it is reloaded when recovering the nydusd d.Config = *fsManager.DaemonConfig - err = d.Config.DumpFile(d.ConfigFile("")) - if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) { - return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile("")) - } if err := fsManager.StartDaemon(d); err != nil { return errors.Wrap(err, "start shared daemon") diff --git a/pkg/manager/daemon_adaptor.go b/pkg/manager/daemon_adaptor.go index 44c61fbea4..4e20767502 100644 --- a/pkg/manager/daemon_adaptor.go +++ b/pkg/manager/daemon_adaptor.go @@ -44,6 +44,16 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error { if err := cmd.Start(); err != nil { return err } + fsDriver := config.GetFsDriver() + isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared + useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev + + if !useSharedDaemon { + errs := d.MountByAPI() + if errs != nil { + return errors.Wrapf(err, "failed to mount") + } + } d.Lock() defer d.Unlock() @@ -155,10 +165,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap) } - cmdOpts = append(cmdOpts, - command.WithConfig(d.ConfigFile("")), - command.WithBootstrap(bootstrap), - ) if config.IsBackendSourceEnabled() { configAPIPath := fmt.Sprintf(endpointGetBackend, d.States.ID) cmdOpts = append(cmdOpts, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5e4cf88951..95f374c6ec 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -197,6 +197,23 @@ func (m *Manager) AddDaemon(daemon *daemon.Daemon) error { return nil } +func (m *Manager) AddSupplementInfo(supplementInfo *daemon.NydusdSupplementInfo) error { + m.mu.Lock() + defer m.mu.Unlock() + if err := m.store.AddInfo(supplementInfo); err != nil { + return errors.Wrapf(err, "add supplementInfo %s", supplementInfo.DaemonState.ID) + } + return nil +} + +func (m *Manager) GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) { + info, err := m.store.GetInfo(daemonID) + if err != nil { + return nil, errors.Wrapf(err, "add supplementInfo %s", daemonID) + } + return info, nil +} + func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error { m.mu.Lock() defer m.mu.Unlock() @@ -322,13 +339,7 @@ func (m *Manager) recoverDaemons(ctx context.Context, } if d.States.FsDriver == config.FsDriverFusedev { - cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile("")) - if err != nil { - log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err) - return err - } - - d.Config = cfg + d.Config = *m.DaemonConfig } state, err := d.GetState() diff --git a/pkg/manager/store.go b/pkg/manager/store.go index 740ab57584..fd29f26a95 100644 --- a/pkg/manager/store.go +++ b/pkg/manager/store.go @@ -29,6 +29,9 @@ type Store interface { WalkRafsInstances(ctx context.Context, cb func(*rafs.Rafs) error) error NextInstanceSeq() (uint64, error) + + AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error + GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) } var _ Store = &store.DaemonRafsStore{} diff --git a/pkg/store/daemonstore.go b/pkg/store/daemonstore.go index e25f3c99c0..79bacaca15 100644 --- a/pkg/store/daemonstore.go +++ b/pkg/store/daemonstore.go @@ -31,6 +31,14 @@ func (s *DaemonRafsStore) AddDaemon(d *daemon.Daemon) error { return s.db.SaveDaemon(context.TODO(), d) } +func (s *DaemonRafsStore) AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error { + return s.db.SaveInfo(context.TODO(), supplementInfo) +} + +func (s *DaemonRafsStore) GetInfo(imageID string) (*daemon.NydusdSupplementInfo, error) { + return s.db.GetSupplementInfo(context.TODO(), imageID) +} + func (s *DaemonRafsStore) UpdateDaemon(d *daemon.Daemon) error { return s.db.UpdateDaemon(context.TODO(), d) } diff --git a/pkg/store/database.go b/pkg/store/database.go index c227a4a59e..99f897500c 100644 --- a/pkg/store/database.go +++ b/pkg/store/database.go @@ -41,7 +41,8 @@ var ( daemonsBucket = []byte("daemons") // RAFS filesystem instances. // A RAFS filesystem may have associated daemon or not. - instancesBucket = []byte("instances") + instancesBucket = []byte("instances") + supplementInfoBucket = []byte("supplement_info") ) // Database keeps infos that need to survive among snapshotter restart @@ -87,6 +88,11 @@ func getInstancesBucket(tx *bolt.Tx) *bolt.Bucket { return bucket.Bucket(instancesBucket) } +func getSupplementInfoBucket(tx *bolt.Tx) *bolt.Bucket { + bucket := tx.Bucket(v1RootBucket) + return bucket.Bucket(supplementInfoBucket) +} + func updateObject(bucket *bolt.Bucket, key string, obj interface{}) error { keyBytes := []byte(key) @@ -163,6 +169,10 @@ func (db *Database) initDatabase() error { return errors.Wrapf(err, "bucket %s", instancesBucket) } + if _, err := bk.CreateBucketIfNotExists(supplementInfoBucket); err != nil { + return err + } + if val := bk.Get(versionKey); val == nil { version = "v1.0" } else { @@ -210,6 +220,25 @@ func (db *Database) SaveDaemon(_ context.Context, d *daemon.Daemon) error { }) } +func (db *Database) SaveInfo(_ context.Context, supplementInfo *daemon.NydusdSupplementInfo) error { + return db.db.Update(func(tx *bolt.Tx) error { + bucket := getSupplementInfoBucket(tx) + key := []byte(supplementInfo.DaemonState.ID) + if existing := bucket.Get(key); existing != nil { + log.L.Infof("Supplement info already exists for ID: %s", supplementInfo.DaemonState.ID) + return nil + } + value, err := json.Marshal(supplementInfo) + if err != nil { + return errors.Wrap(err, "failed to marshal supplement info") + } + if err := bucket.Put(key, value); err != nil { + return errors.Wrap(err, "failed to save supplement info") + } + return nil + }) +} + func (db *Database) UpdateDaemon(_ context.Context, d *daemon.Daemon) error { return db.db.Update(func(tx *bolt.Tx) error { bucket := getDaemonsBucket(tx) @@ -262,6 +291,25 @@ func (db *Database) WalkDaemons(_ context.Context, cb func(info *daemon.ConfigSt }) } +func (db *Database) GetSupplementInfo(_ context.Context, daemonID string) (*daemon.NydusdSupplementInfo, error) { + var info daemon.NydusdSupplementInfo + err := db.db.View(func(tx *bolt.Tx) error { + bucket := getSupplementInfoBucket(tx) + if bucket == nil { + return errdefs.ErrNotFound + } + value := bucket.Get([]byte(daemonID)) + if value == nil { + return errdefs.ErrNotFound + } + return json.Unmarshal(value, &info) + }) + if err != nil { + return nil, err + } + return &info, nil +} + // WalkDaemons iterates all daemon records and invoke callback on each func (db *Database) WalkRafsInstances(_ context.Context, cb func(r *rafs.Rafs) error) error { return db.db.View(func(tx *bolt.Tx) error { diff --git a/tests/helpers/helpers.sh b/tests/helpers/helpers.sh index efca0b2762..9b9d5b6dc4 100644 --- a/tests/helpers/helpers.sh +++ b/tests/helpers/helpers.sh @@ -99,7 +99,7 @@ install::kind(){ } install::kubectl(){ - local version="${1:-}" + local version="${1:-v1.30.0}" [ "$version" ] || version="$(http::get /dev/stdout https://dl.k8s.io/release/stable.txt)" local temp temp="$(fs::mktemp "install")"