Skip to content

Commit

Permalink
feat: Remove DumpFile operations
Browse files Browse the repository at this point in the history
  • Loading branch information
fappy1234567 committed Sep 18, 2024
1 parent 8fa319b commit e272e20
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 50 deletions.
6 changes: 6 additions & 0 deletions config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"reflect"
"strings"
"sync"

"github.com/pkg/errors"

Expand Down Expand Up @@ -122,6 +123,8 @@ type DeviceConfig struct {
} `json:"cache"`
}

var configMutex sync.Mutex

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
// We don't have to persist configuration file for fscache since its configuration
// is passed through HTTP API.
Expand All @@ -146,6 +149,9 @@ func DumpConfigString(c interface{}) (string, error) {
func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
vpcRegistry bool, labels map[string]string, params map[string]string) error {

configMutex.Lock()
defer configMutex.Unlock()

image, err := registry.ParseImage(imageID)
if err != nil {
return errors.Wrapf(err, "parse image %s", imageID)
Expand Down
49 changes: 37 additions & 12 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ type Daemon struct {
state types.DaemonState
}

type NydusdSupplementInfo struct {
DaemonState ConfigState
ImageID string
SnapshotID string
Vpc bool
Labels map[string]string
Params map[string]string
}

func (d *Daemon) Lock() {
d.mu.Lock()
}
Expand Down Expand Up @@ -250,12 +259,7 @@ func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
}

c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
Expand All @@ -280,12 +284,7 @@ func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err)
return err
}

c := d.Config
cfgStr, err := c.DumpString()
if err != nil {
return err
Expand Down Expand Up @@ -650,3 +649,29 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {

return d, nil
}

func (d *Daemon) MountByAPI() error {
rafs := d.RafsCache.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}
c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
}
err = client.Mount("/", bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance MountByAPI()")
}
return nil

}
55 changes: 29 additions & 26 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,23 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {
if err != nil {
return errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID)
}

supplementInfo, err := fsManager.GetInfo(d.ID())
if err != nil {
return errors.Wrap(err, "GetInfo failed")
}

cfg := d.Config
imageID := supplementInfo.ImageID
snapshotID := supplementInfo.SnapshotID
labels := supplementInfo.Labels
params := supplementInfo.Params
err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}
d.Config = cfg

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrapf(err, "start daemon %s", d.ID())
}
Expand Down Expand Up @@ -232,7 +249,6 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
// Instance already exists, how could this happen? Can containerd handle this case?
return nil
}

fsDriver := config.GetFsDriver()
if label.IsTarfsDataLayer(labels) {
fsDriver = config.FsDriverBlockdev
Expand Down Expand Up @@ -307,29 +323,20 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
if err != nil {
return errors.Wrap(err, "supplement configuration")
}

// TODO: How to manage rafs configurations on-disk? separated json config file or DB record?
// In order to recover erofs mount, the configuration file has to be persisted.
var configSubDir string
if useSharedDaemon {
configSubDir = snapshotID
} else {
// Associate daemon config object when creating a new daemon object to avoid
// reading disk file again and again.
// For shared daemon, each rafs instance has its own configuration, so we don't
// attach a config interface to daemon in this case.
d.Config = cfg
supplementInfo := &daemon.NydusdSupplementInfo{
DaemonState: d.States,
ImageID: imageID,
SnapshotID: snapshotID,
Vpc: false,
Labels: labels,
Params: params,
}

err = cfg.DumpFile(d.ConfigFile(configSubDir))
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyExists) {
log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir))
} else {
return errors.Wrap(err, "dump daemon configuration file")
}
if errs := fsManager.AddSupplementInfo(supplementInfo); errs != nil {
return errors.Wrapf(err, "AddSupplementInfo failed %s", d.States.ID)
}

// TODO: How to manage rafs configurations on-disk? separated json config file or DB record?
// In order to recover erofs mount, the configuration file has to be persisted.
d.Config = cfg
d.AddRafsInstance(rafs)

// if publicKey is not empty we should verify bootstrap file of image
Expand Down Expand Up @@ -596,10 +603,6 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
// it is loaded when requesting mount api
// Dump the configuration file since it is reloaded when recovering the nydusd
d.Config = *fsManager.DaemonConfig
err = d.Config.DumpFile(d.ConfigFile(""))
if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) {
return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile(""))
}

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrap(err, "start shared daemon")
Expand Down
14 changes: 10 additions & 4 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
if err := cmd.Start(); err != nil {
return err
}
fsDriver := config.GetFsDriver()
isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared
useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev

if !useSharedDaemon {
errs := d.MountByAPI()
if errs != nil {
return errors.Wrapf(err, "failed to mount")
}
}

d.Lock()
defer d.Unlock()
Expand Down Expand Up @@ -155,10 +165,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
)
if config.IsBackendSourceEnabled() {
configAPIPath := fmt.Sprintf(endpointGetBackend, d.States.ID)
cmdOpts = append(cmdOpts,
Expand Down
25 changes: 18 additions & 7 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,23 @@ func (m *Manager) AddDaemon(daemon *daemon.Daemon) error {
return nil
}

func (m *Manager) AddSupplementInfo(supplementInfo *daemon.NydusdSupplementInfo) error {
m.mu.Lock()
defer m.mu.Unlock()
if err := m.store.AddInfo(supplementInfo); err != nil {
return errors.Wrapf(err, "add supplementInfo %s", supplementInfo.DaemonState.ID)
}
return nil
}

func (m *Manager) GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) {
info, err := m.store.GetInfo(daemonID)
if err != nil {
return nil, errors.Wrapf(err, "add supplementInfo %s", daemonID)
}
return info, nil
}

func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -322,13 +339,7 @@ func (m *Manager) recoverDaemons(ctx context.Context,
}

if d.States.FsDriver == config.FsDriverFusedev {
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err)
return err
}

d.Config = cfg
d.Config = *m.DaemonConfig
}

state, err := d.GetState()
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Store interface {
WalkRafsInstances(ctx context.Context, cb func(*rafs.Rafs) error) error

NextInstanceSeq() (uint64, error)

AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error
GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error)
}

var _ Store = &store.DaemonRafsStore{}
8 changes: 8 additions & 0 deletions pkg/store/daemonstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func (s *DaemonRafsStore) AddDaemon(d *daemon.Daemon) error {
return s.db.SaveDaemon(context.TODO(), d)
}

func (s *DaemonRafsStore) AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error {
return s.db.SaveInfo(context.TODO(), supplementInfo)
}

func (s *DaemonRafsStore) GetInfo(imageID string) (*daemon.NydusdSupplementInfo, error) {
return s.db.GetSupplementInfo(context.TODO(), imageID)
}

func (s *DaemonRafsStore) UpdateDaemon(d *daemon.Daemon) error {
return s.db.UpdateDaemon(context.TODO(), d)
}
Expand Down
50 changes: 49 additions & 1 deletion pkg/store/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ var (
daemonsBucket = []byte("daemons")
// RAFS filesystem instances.
// A RAFS filesystem may have associated daemon or not.
instancesBucket = []byte("instances")
instancesBucket = []byte("instances")
supplementInfoBucket = []byte("supplement_info")
)

// Database keeps infos that need to survive among snapshotter restart
Expand Down Expand Up @@ -87,6 +88,11 @@ func getInstancesBucket(tx *bolt.Tx) *bolt.Bucket {
return bucket.Bucket(instancesBucket)
}

func getSupplementInfoBucket(tx *bolt.Tx) *bolt.Bucket {
bucket := tx.Bucket(v1RootBucket)
return bucket.Bucket(supplementInfoBucket)
}

func updateObject(bucket *bolt.Bucket, key string, obj interface{}) error {
keyBytes := []byte(key)

Expand Down Expand Up @@ -163,6 +169,10 @@ func (db *Database) initDatabase() error {
return errors.Wrapf(err, "bucket %s", instancesBucket)
}

if _, err := bk.CreateBucketIfNotExists(supplementInfoBucket); err != nil {
return err
}

if val := bk.Get(versionKey); val == nil {
version = "v1.0"
} else {
Expand Down Expand Up @@ -210,6 +220,25 @@ func (db *Database) SaveDaemon(_ context.Context, d *daemon.Daemon) error {
})
}

func (db *Database) SaveInfo(_ context.Context, supplementInfo *daemon.NydusdSupplementInfo) error {
return db.db.Update(func(tx *bolt.Tx) error {
bucket := getSupplementInfoBucket(tx)
key := []byte(supplementInfo.DaemonState.ID)
if existing := bucket.Get(key); existing != nil {
log.L.Infof("Supplement info already exists for ID: %s", supplementInfo.DaemonState.ID)
return nil
}
value, err := json.Marshal(supplementInfo)
if err != nil {
return errors.Wrap(err, "failed to marshal supplement info")
}
if err := bucket.Put(key, value); err != nil {
return errors.Wrap(err, "failed to save supplement info")
}
return nil
})
}

func (db *Database) UpdateDaemon(_ context.Context, d *daemon.Daemon) error {
return db.db.Update(func(tx *bolt.Tx) error {
bucket := getDaemonsBucket(tx)
Expand Down Expand Up @@ -262,6 +291,25 @@ func (db *Database) WalkDaemons(_ context.Context, cb func(info *daemon.ConfigSt
})
}

func (db *Database) GetSupplementInfo(_ context.Context, daemonID string) (*daemon.NydusdSupplementInfo, error) {
var info daemon.NydusdSupplementInfo
err := db.db.View(func(tx *bolt.Tx) error {
bucket := getSupplementInfoBucket(tx)
if bucket == nil {
return errdefs.ErrNotFound
}
value := bucket.Get([]byte(daemonID))
if value == nil {
return errdefs.ErrNotFound
}
return json.Unmarshal(value, &info)
})
if err != nil {
return nil, err
}
return &info, nil
}

// WalkDaemons iterates all daemon records and invoke callback on each
func (db *Database) WalkRafsInstances(_ context.Context, cb func(r *rafs.Rafs) error) error {
return db.db.View(func(tx *bolt.Tx) error {
Expand Down

0 comments on commit e272e20

Please sign in to comment.