diff --git a/config/daemonconfig/daemonconfig.go b/config/daemonconfig/daemonconfig.go index f515185a32..e554acd973 100644 --- a/config/daemonconfig/daemonconfig.go +++ b/config/daemonconfig/daemonconfig.go @@ -12,6 +12,7 @@ import ( "os" "reflect" "strings" + "sync" "github.com/pkg/errors" @@ -122,6 +123,8 @@ type DeviceConfig struct { } `json:"cache"` } +var configRWMutex sync.RWMutex + // For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file // We don't have to persist configuration file for fscache since its configuration // is passed through HTTP API. @@ -146,6 +149,9 @@ func DumpConfigString(c interface{}) (string, error) { func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string, vpcRegistry bool, labels map[string]string, params map[string]string) error { + configRWMutex.Lock() + defer configRWMutex.Unlock() + image, err := registry.ParseImage(imageID) if err != nil { return errors.Wrapf(err, "parse image %s", imageID) diff --git a/config/daemonconfig/fuse.go b/config/daemonconfig/fuse.go index 424c2e1c7c..c8c258c7e7 100644 --- a/config/daemonconfig/fuse.go +++ b/config/daemonconfig/fuse.go @@ -92,6 +92,8 @@ func (c *FuseDaemonConfig) StorageBackend() (string, *BackendConfig) { } func (c *FuseDaemonConfig) DumpString() (string, error) { + configRWMutex.Lock() + defer configRWMutex.Unlock() return DumpConfigString(c) } diff --git a/misc/snapshotter/Dockerfile b/misc/snapshotter/Dockerfile index d3cef30762..66a2e0da98 100644 --- a/misc/snapshotter/Dockerfile +++ b/misc/snapshotter/Dockerfile @@ -17,7 +17,7 @@ FROM base AS kubectl-sourcer ARG TARGETARCH RUN apk add -q --no-cache curl && \ - curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \ + curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/v1.30.0/bin/linux/"$TARGETARCH"/kubectl && \ chmod +x /usr/bin/kubectl FROM base diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 2c3a68603b..d9bb6609a0 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -87,6 +87,15 @@ type Daemon struct { state types.DaemonState } +type NydusdSupplementInfo struct { + DaemonState ConfigState + ImageID string + SnapshotID string + Vpc bool + Labels map[string]string + Params map[string]string +} + func (d *Daemon) Lock() { d.mu.Lock() } @@ -250,12 +259,7 @@ func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error { return err } - c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID)) - if err != nil { - return errors.Wrapf(err, "Failed to reload instance configuration %s", - d.ConfigFile(rafs.SnapshotID)) - } - + c := d.Config cfg, err := c.DumpString() if err != nil { return errors.Wrap(err, "dump instance configuration") @@ -280,12 +284,7 @@ func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error { return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir()) } - c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID)) - if err != nil { - log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err) - return err - } - + c := d.Config cfgStr, err := c.DumpString() if err != nil { return err @@ -650,3 +649,29 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) { return d, nil } + +func (d *Daemon) MountByAPI() error { + rafs := d.RafsCache.Head() + if rafs == nil { + return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID()) + } + client, err := d.GetClient() + if err != nil { + return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID) + } + bootstrap, err := rafs.BootstrapFile() + if err != nil { + return err + } + c := d.Config + cfg, err := c.DumpString() + if err != nil { + return errors.Wrap(err, "dump instance configuration") + } + err = client.Mount("/", bootstrap, cfg) + if err != nil { + return errors.Wrapf(err, "mount rafs instance MountByAPI()") + } + return nil + +} diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index 88b57f6e1b..9d14c1ea49 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -130,6 +130,23 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { if err != nil { return errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID) } + + supplementInfo, err := fsManager.GetInfo(d.ID()) + if err != nil { + return errors.Wrap(err, "GetInfo failed") + } + + cfg := d.Config + imageID := supplementInfo.ImageID + snapshotID := supplementInfo.SnapshotID + labels := supplementInfo.Labels + params := supplementInfo.Params + err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params) + if err != nil { + return errors.Wrap(err, "supplement configuration") + } + d.Config = cfg + if err := fsManager.StartDaemon(d); err != nil { return errors.Wrapf(err, "start daemon %s", d.ID()) } @@ -232,7 +249,6 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s // Instance already exists, how could this happen? Can containerd handle this case? return nil } - fsDriver := config.GetFsDriver() if label.IsTarfsDataLayer(labels) { fsDriver = config.FsDriverBlockdev @@ -307,29 +323,20 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s if err != nil { return errors.Wrap(err, "supplement configuration") } - - // TODO: How to manage rafs configurations on-disk? separated json config file or DB record? - // In order to recover erofs mount, the configuration file has to be persisted. - var configSubDir string - if useSharedDaemon { - configSubDir = snapshotID - } else { - // Associate daemon config object when creating a new daemon object to avoid - // reading disk file again and again. - // For shared daemon, each rafs instance has its own configuration, so we don't - // attach a config interface to daemon in this case. - d.Config = cfg + supplementInfo := &daemon.NydusdSupplementInfo{ + DaemonState: d.States, + ImageID: imageID, + SnapshotID: snapshotID, + Vpc: false, + Labels: labels, + Params: params, } - - err = cfg.DumpFile(d.ConfigFile(configSubDir)) - if err != nil { - if errors.Is(err, errdefs.ErrAlreadyExists) { - log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir)) - } else { - return errors.Wrap(err, "dump daemon configuration file") - } + if errs := fsManager.AddSupplementInfo(supplementInfo); errs != nil { + return errors.Wrapf(err, "AddSupplementInfo failed %s", d.States.ID) } - + // TODO: How to manage rafs configurations on-disk? separated json config file or DB record? + // In order to recover erofs mount, the configuration file has to be persisted. + d.Config = cfg d.AddRafsInstance(rafs) // if publicKey is not empty we should verify bootstrap file of image @@ -596,10 +603,6 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) { // it is loaded when requesting mount api // Dump the configuration file since it is reloaded when recovering the nydusd d.Config = *fsManager.DaemonConfig - err = d.Config.DumpFile(d.ConfigFile("")) - if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) { - return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile("")) - } if err := fsManager.StartDaemon(d); err != nil { return errors.Wrap(err, "start shared daemon") diff --git a/pkg/manager/daemon_adaptor.go b/pkg/manager/daemon_adaptor.go index 44c61fbea4..4e20767502 100644 --- a/pkg/manager/daemon_adaptor.go +++ b/pkg/manager/daemon_adaptor.go @@ -44,6 +44,16 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error { if err := cmd.Start(); err != nil { return err } + fsDriver := config.GetFsDriver() + isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared + useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev + + if !useSharedDaemon { + errs := d.MountByAPI() + if errs != nil { + return errors.Wrapf(err, "failed to mount") + } + } d.Lock() defer d.Unlock() @@ -155,10 +165,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap) } - cmdOpts = append(cmdOpts, - command.WithConfig(d.ConfigFile("")), - command.WithBootstrap(bootstrap), - ) if config.IsBackendSourceEnabled() { configAPIPath := fmt.Sprintf(endpointGetBackend, d.States.ID) cmdOpts = append(cmdOpts, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5e4cf88951..95f374c6ec 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -197,6 +197,23 @@ func (m *Manager) AddDaemon(daemon *daemon.Daemon) error { return nil } +func (m *Manager) AddSupplementInfo(supplementInfo *daemon.NydusdSupplementInfo) error { + m.mu.Lock() + defer m.mu.Unlock() + if err := m.store.AddInfo(supplementInfo); err != nil { + return errors.Wrapf(err, "add supplementInfo %s", supplementInfo.DaemonState.ID) + } + return nil +} + +func (m *Manager) GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) { + info, err := m.store.GetInfo(daemonID) + if err != nil { + return nil, errors.Wrapf(err, "add supplementInfo %s", daemonID) + } + return info, nil +} + func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error { m.mu.Lock() defer m.mu.Unlock() @@ -322,13 +339,7 @@ func (m *Manager) recoverDaemons(ctx context.Context, } if d.States.FsDriver == config.FsDriverFusedev { - cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile("")) - if err != nil { - log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err) - return err - } - - d.Config = cfg + d.Config = *m.DaemonConfig } state, err := d.GetState() diff --git a/pkg/manager/store.go b/pkg/manager/store.go index 740ab57584..fd29f26a95 100644 --- a/pkg/manager/store.go +++ b/pkg/manager/store.go @@ -29,6 +29,9 @@ type Store interface { WalkRafsInstances(ctx context.Context, cb func(*rafs.Rafs) error) error NextInstanceSeq() (uint64, error) + + AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error + GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) } var _ Store = &store.DaemonRafsStore{} diff --git a/pkg/store/daemonstore.go b/pkg/store/daemonstore.go index e25f3c99c0..79bacaca15 100644 --- a/pkg/store/daemonstore.go +++ b/pkg/store/daemonstore.go @@ -31,6 +31,14 @@ func (s *DaemonRafsStore) AddDaemon(d *daemon.Daemon) error { return s.db.SaveDaemon(context.TODO(), d) } +func (s *DaemonRafsStore) AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error { + return s.db.SaveInfo(context.TODO(), supplementInfo) +} + +func (s *DaemonRafsStore) GetInfo(imageID string) (*daemon.NydusdSupplementInfo, error) { + return s.db.GetSupplementInfo(context.TODO(), imageID) +} + func (s *DaemonRafsStore) UpdateDaemon(d *daemon.Daemon) error { return s.db.UpdateDaemon(context.TODO(), d) } diff --git a/pkg/store/database.go b/pkg/store/database.go index c227a4a59e..99f897500c 100644 --- a/pkg/store/database.go +++ b/pkg/store/database.go @@ -41,7 +41,8 @@ var ( daemonsBucket = []byte("daemons") // RAFS filesystem instances. // A RAFS filesystem may have associated daemon or not. - instancesBucket = []byte("instances") + instancesBucket = []byte("instances") + supplementInfoBucket = []byte("supplement_info") ) // Database keeps infos that need to survive among snapshotter restart @@ -87,6 +88,11 @@ func getInstancesBucket(tx *bolt.Tx) *bolt.Bucket { return bucket.Bucket(instancesBucket) } +func getSupplementInfoBucket(tx *bolt.Tx) *bolt.Bucket { + bucket := tx.Bucket(v1RootBucket) + return bucket.Bucket(supplementInfoBucket) +} + func updateObject(bucket *bolt.Bucket, key string, obj interface{}) error { keyBytes := []byte(key) @@ -163,6 +169,10 @@ func (db *Database) initDatabase() error { return errors.Wrapf(err, "bucket %s", instancesBucket) } + if _, err := bk.CreateBucketIfNotExists(supplementInfoBucket); err != nil { + return err + } + if val := bk.Get(versionKey); val == nil { version = "v1.0" } else { @@ -210,6 +220,25 @@ func (db *Database) SaveDaemon(_ context.Context, d *daemon.Daemon) error { }) } +func (db *Database) SaveInfo(_ context.Context, supplementInfo *daemon.NydusdSupplementInfo) error { + return db.db.Update(func(tx *bolt.Tx) error { + bucket := getSupplementInfoBucket(tx) + key := []byte(supplementInfo.DaemonState.ID) + if existing := bucket.Get(key); existing != nil { + log.L.Infof("Supplement info already exists for ID: %s", supplementInfo.DaemonState.ID) + return nil + } + value, err := json.Marshal(supplementInfo) + if err != nil { + return errors.Wrap(err, "failed to marshal supplement info") + } + if err := bucket.Put(key, value); err != nil { + return errors.Wrap(err, "failed to save supplement info") + } + return nil + }) +} + func (db *Database) UpdateDaemon(_ context.Context, d *daemon.Daemon) error { return db.db.Update(func(tx *bolt.Tx) error { bucket := getDaemonsBucket(tx) @@ -262,6 +291,25 @@ func (db *Database) WalkDaemons(_ context.Context, cb func(info *daemon.ConfigSt }) } +func (db *Database) GetSupplementInfo(_ context.Context, daemonID string) (*daemon.NydusdSupplementInfo, error) { + var info daemon.NydusdSupplementInfo + err := db.db.View(func(tx *bolt.Tx) error { + bucket := getSupplementInfoBucket(tx) + if bucket == nil { + return errdefs.ErrNotFound + } + value := bucket.Get([]byte(daemonID)) + if value == nil { + return errdefs.ErrNotFound + } + return json.Unmarshal(value, &info) + }) + if err != nil { + return nil, err + } + return &info, nil +} + // WalkDaemons iterates all daemon records and invoke callback on each func (db *Database) WalkRafsInstances(_ context.Context, cb func(r *rafs.Rafs) error) error { return db.db.View(func(tx *bolt.Tx) error { diff --git a/tests/helpers/helpers.sh b/tests/helpers/helpers.sh index efca0b2762..9b9d5b6dc4 100644 --- a/tests/helpers/helpers.sh +++ b/tests/helpers/helpers.sh @@ -99,7 +99,7 @@ install::kind(){ } install::kubectl(){ - local version="${1:-}" + local version="${1:-v1.30.0}" [ "$version" ] || version="$(http::get /dev/stdout https://dl.k8s.io/release/stable.txt)" local temp temp="$(fs::mktemp "install")"