Skip to content

Commit

Permalink
feat: add locking for files on per-prefix-basis to allow for concurre…
Browse files Browse the repository at this point in the history
…nt syncing and querying
  • Loading branch information
FlorianLoch committed Feb 15, 2024
1 parent e27c671 commit e6ff32a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 15 deletions.
15 changes: 3 additions & 12 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ func Sync(options ...SyncOption) error {
maxRetries: 3,
}

storage := &fsStorage{
dataDir: config.dataDir,
doNotUseCompression: config.noCompression,
}
storage := newFSStorage(config.dataDir, config.noCompression)

pool := pond.New(config.minWorkers, 0, pond.MinWorkers(config.minWorkers))

Expand All @@ -80,10 +77,7 @@ func Export(w io.Writer, options ...ExportOption) error {
option(config)
}

storage := &fsStorage{
dataDir: config.dataDir,
doNotUseCompression: config.noCompression,
}
storage := newFSStorage(config.dataDir, config.noCompression)

return export(0, defaultLastRange+1, storage, w)
}
Expand All @@ -104,10 +98,7 @@ func NewRangeAPI(options ...QueryOption) *RangeAPI {
}

return &RangeAPI{
storage: &fsStorage{
dataDir: config.dataDir,
doNotUseCompression: config.noCompression,
},
storage: newFSStorage(config.dataDir, config.noCompression),
}
}

Expand Down
63 changes: 60 additions & 3 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,64 @@ type storage interface {

type fsStorage struct {
dataDir string
writeLock syncPkg.Mutex
doNotUseCompression bool
createDirsLock syncPkg.Mutex
lockMapLock syncPkg.Mutex
fileLocks map[string]*syncPkg.RWMutex // prefix -> lock
}

var _ storage = (*fsStorage)(nil)

func newFSStorage(dataDir string, doNotUseCompression bool) *fsStorage {
return &fsStorage{
dataDir: dataDir,
doNotUseCompression: doNotUseCompression,
fileLocks: make(map[string]*syncPkg.RWMutex),
}
}

type lockType int

const (
read lockType = iota
write
)

func (f *fsStorage) lockFile(key string, t lockType) func() {
defers := make([]func(), 0, 2)

f.lockMapLock.Lock()
fileLock, exists := f.fileLocks[key]
if !exists {
fileLock = &syncPkg.RWMutex{}
f.fileLocks[key] = fileLock
}
f.lockMapLock.Unlock()

if t == write {
fileLock.Lock()
defers = append(defers, fileLock.Unlock)
} else {
fileLock.RLock()
defers = append(defers, fileLock.RUnlock)
}

defers = append(defers, func() {
f.lockMapLock.Lock()
delete(f.fileLocks, key)
f.lockMapLock.Unlock()
})

return func() {
for i := len(defers) - 1; i >= 0; i-- {
defers[i]()
}
}
}

func (f *fsStorage) Save(key, etag string, data []byte) error {
defer f.lockFile(key, write)()

if err := f.createDirs(key); err != nil {
return fmt.Errorf("creating data directory: %w", err)
}
Expand Down Expand Up @@ -71,13 +122,16 @@ func (f *fsStorage) Save(key, etag string, data []byte) error {
func (f *fsStorage) createDirs(key string) error {
// We need to synchronize calls to Save because we don't want to create the same parent directory for several files
// at the same time.
f.writeLock.Lock()
defer f.writeLock.Unlock()
// This could be made smarter to lock on a per-path basis, but that is most likely not worth the complexity.
f.createDirsLock.Lock()
defer f.createDirsLock.Unlock()

return os.MkdirAll(f.subDir(key), dirMode)
}

func (f *fsStorage) LoadETag(key string) (string, error) {
defer f.lockFile(key, read)()

file, err := os.Open(f.filePath(key))
if err != nil {
return "", fmt.Errorf("opening file %q: %w", f.filePath(key), err)
Expand Down Expand Up @@ -106,6 +160,8 @@ func (f *fsStorage) LoadETag(key string) (string, error) {
}

func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) {
unlockFile := f.lockFile(key, read)

file, err := os.Open(f.filePath(key))
if err != nil {
return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err)
Expand Down Expand Up @@ -141,6 +197,7 @@ func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) {
return &closableReader{
Reader: bufReader,
closeFn: func() error {
defer unlockFile()
defer file.Close()
if dec != nil {
defer dec.Close()
Expand Down

0 comments on commit e6ff32a

Please sign in to comment.