Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix io manager issue(especially mmap) and test function #132

Merged
merged 3 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Options struct {
DataFileSize int64 //数据文件的大小
SyncWrite bool // 每次写数据是否持久化
IndexType IndexerType
FIOType FIOType
}

// IteratorOptions 索引迭代器配置项
Expand All @@ -25,6 +26,14 @@ type WriteBatchOptions struct {
SyncWrites bool
}

type FIOType = int8

const (
FileIOType = iota + 1 // Standard File IO
BufIOType // File IO with buffer
MmapIOType // Memory Mapping IO
)

type IndexerType = int8

const (
Expand All @@ -40,6 +49,7 @@ var DefaultOptions = Options{
DataFileSize: 256 * 1024 * 1024, // 256MB
SyncWrite: false,
IndexType: Btree,
FIOType: FileIOType,
}

var DefaultIteratorOptions = IteratorOptions{
Expand Down
16 changes: 8 additions & 8 deletions data/data_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@ type DataFile struct {
}

// OpenDataFile 打开新的数据文件
func OpenDataFile(dirPath string, fildID uint32) (*DataFile, error) {
func OpenDataFile(dirPath string, fildID uint32, fileSize int64, fioType int8) (*DataFile, error) {
fileName := GetDataFileName(dirPath, fildID)
return newDataFile(fileName, fildID)
return newDataFile(fileName, fildID, fileSize, fioType)
}

func GetDataFileName(dirPath string, fildID uint32) string {
return filepath.Join(dirPath, fmt.Sprintf("%09d", fildID)+DataFileSuffix)
}

// OpenHintFile 打开 Hint 索引文件
func OpenHintFile(dirPath string) (*DataFile, error) {
func OpenHintFile(dirPath string, fileSize int64, fioType int8) (*DataFile, error) {
fileName := filepath.Join(dirPath, HintFileSuffix)
return newDataFile(fileName, 0)
return newDataFile(fileName, 0, fileSize, fioType)
}

// OpenMergeFinaFile 打开标识 merge 完成的文件
func OpenMergeFinaFile(dirPath string) (*DataFile, error) {
func OpenMergeFinaFile(dirPath string, fileSize int64, fioType int8) (*DataFile, error) {
fileName := filepath.Join(dirPath, MergeFinaFileSuffix)
return newDataFile(fileName, 0)
return newDataFile(fileName, 0, fileSize, fioType)
}

func newDataFile(dirPath string, fildID uint32) (*DataFile, error) {
func newDataFile(dirPath string, fildID uint32, fileSize int64, fioType int8) (*DataFile, error) {
//初始化 IOManager 管理器接口
ioManager, err := fio.NewIOManager(dirPath)
ioManager, err := fio.NewIOManager(dirPath, fileSize, fioType)
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions data/data_file_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package data

import (
"github.com/ByteStorage/FlyDB/fio"
"github.com/stretchr/testify/assert"
"os"
"testing"
)

const DefaultFileSize = 256 * 1024 * 1024

func TestOpenDataFile(t *testing.T) {
dataFile1, err := OpenDataFile(os.TempDir(), 0)
dataFile1, err := OpenDataFile(os.TempDir(), 0, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile1)

dataFile2, err := OpenDataFile(os.TempDir(), 1)
dataFile2, err := OpenDataFile(os.TempDir(), 1, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile2)

dataFile3, err := OpenDataFile(os.TempDir(), 1)
dataFile3, err := OpenDataFile(os.TempDir(), 1, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile3)
}

func TestDataFile_Write(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 12312)
dataFile, err := OpenDataFile(os.TempDir(), 12312, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand All @@ -36,7 +39,7 @@ func TestDataFile_Write(t *testing.T) {
}

func TestDataFile_Close(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 1111111)
dataFile, err := OpenDataFile(os.TempDir(), 1111111, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand All @@ -45,7 +48,7 @@ func TestDataFile_Close(t *testing.T) {
}

func TestDataFile_Sync(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 2222222)
dataFile, err := OpenDataFile(os.TempDir(), 2222222, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand All @@ -54,7 +57,7 @@ func TestDataFile_Sync(t *testing.T) {
}

func TestDataFile_ReadLogRecord(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 123)
dataFile, err := OpenDataFile(os.TempDir(), 123, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand Down
4 changes: 2 additions & 2 deletions engine/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (db *DB) setActiveDataFile() error {
}

// Open a new data file
dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileID)
dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileID, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (db *DB) loadDataFiles() error {

// Walk through each file id and open the corresponding data file
for i, fid := range fileIds {
dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid))
dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid), db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions engine/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (db *DB) Merge() error {
}

// 打开 hint 文件存储索引
hintFile, err := data.OpenHintFile(mergePath)
hintFile, err := data.OpenHintFile(mergePath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (db *DB) Merge() error {
}

// 写标识 merge 完成的文件
mergeFinaFile, err := data.OpenMergeFinaFile(mergePath)
mergeFinaFile, err := data.OpenMergeFinaFile(mergePath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (db *DB) loadMergeFiles() error {

// 获取最近没有参与 merge 的文件 id
func (db *DB) getRecentlyNonMergeFileId(dirPath string) (uint32, error) {
mergeFinaFile, err := data.OpenMergeFinaFile(dirPath)
mergeFinaFile, err := data.OpenMergeFinaFile(dirPath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return 0, err
}
Expand All @@ -247,7 +247,7 @@ func (db *DB) loadIndexFromHintFile() error {
}

// 打开 hint 文件
hintFile, err := data.OpenHintFile(db.options.DirPath)
hintFile, err := data.OpenHintFile(db.options.DirPath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down
4 changes: 0 additions & 4 deletions fio/file_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,3 @@ func (fio *FileIO) Size() (int64, error) {
}
return stat.Size(), nil
}

func NewIOManager(filename string) (IOManager, error) {
return NewFileIOManager(filename)
}
21 changes: 21 additions & 0 deletions fio/io_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ package fio

const DataFilePerm = 0644 //0644 表示创建了一个文件,文件所有者可以读写,其他人只能读

const DefaultFileSize = 256 * 1024 * 1024

const (
FileIOType = iota + 1 // Standard File IO
BufIOType // File IO with buffer
MmapIOType // Memory Mapping IO
)

// IOManager 抽象 IO 管理接口, 可以接入不同的 IO 类型, 目前支持标准文件 IO
type IOManager interface {
// Read 从文件的给定位置读取对应的数据
Expand All @@ -19,3 +27,16 @@ type IOManager interface {
// Size get file size
Size() (int64, error)
}

// NewIOManager get IOManager based on type
func NewIOManager(filename string, fileSize int64, fioType int8) (IOManager, error) {
switch fioType {
case FileIOType:
return NewFileIOManager(filename)
case BufIOType:
return NewBufIOManager(filename)
case MmapIOType:
return NewMMapIOManager(filename, fileSize)
}
return NewFileIOManager(filename)
}
36 changes: 20 additions & 16 deletions fio/mmap_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@ import (
"unsafe"
)

// DefaultMemMapSize 最大映射内存大小
const DefaultMemMapSize = 256 * 1024 * 1024

// MMapIO 标准系统文件IO
type MMapIO struct {
fd *os.File // 系统文件描述符
data []byte // 与文件对应的映射区
dirty bool // 是否更改过
offset int64 // 写入位置
fd *os.File // system file descriptor
data []byte // the mapping area corresponding to the file
dirty bool // has changed
offset int64 // next write location
fileSize int64 // max file size
}

// NewMMapIOManager 初始化标准文件 IO
func NewMMapIOManager(fileName string) (*MMapIO, error) {
mmapIO := &MMapIO{}
// NewMMapIOManager Initialize Mmap IO
func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) {
mmapIO := &MMapIO{fileSize: fileSize}

fd, err := os.OpenFile(
fileName,
os.O_CREATE|os.O_RDWR|os.O_APPEND,
Expand All @@ -31,13 +29,13 @@ func NewMMapIOManager(fileName string) (*MMapIO, error) {
}
info, _ := fd.Stat()

// 将文件扩容到映射区大小, 保存时会裁剪
if err := fd.Truncate(DefaultMemMapSize); err != nil {
// Expand files to maximum file size, crop when saving
if err := fd.Truncate(fileSize); err != nil {
return nil, err
}

// 构建映射
b, err := syscall.Mmap(int(fd.Fd()), 0, DefaultMemMapSize, syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
// Building mappings between memory and disk files
b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileSize), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
Expand All @@ -48,14 +46,16 @@ func NewMMapIOManager(fileName string) (*MMapIO, error) {
return mmapIO, nil
}

// Read Copy data from the mapping area to byte slice
func (mio *MMapIO) Read(b []byte, offset int64) (int, error) {
return copy(b, mio.data[offset:]), nil
}

// Write Copy data from byte slice to the mapping area
func (mio *MMapIO) Write(b []byte) (int, error) {
oldOffset := mio.offset
newOffset := mio.offset + int64(len(b))
if newOffset > DefaultMemMapSize {
if newOffset > mio.fileSize {
return 0, errors.New("exceed file max content length")
}

Expand All @@ -64,6 +64,7 @@ func (mio *MMapIO) Write(b []byte) (int, error) {
return copy(mio.data[oldOffset:], b), nil
}

// Sync Synchronize data from memory to disk
func (mio *MMapIO) Sync() error {
if !mio.dirty {
return nil
Expand All @@ -78,6 +79,7 @@ func (mio *MMapIO) Sync() error {
return nil
}

// Close file
func (mio *MMapIO) Close() (err error) {
if err = mio.fd.Truncate(mio.offset); err != nil {
return err
Expand All @@ -91,10 +93,12 @@ func (mio *MMapIO) Close() (err error) {
return mio.fd.Close()
}

// Size return the size of current file
func (mio *MMapIO) Size() (int64, error) {
return mio.offset, nil
}

// UnMap Unmapping between memory and files
func (mio *MMapIO) UnMap() error {
if mio.data == nil {
return nil
Expand Down
14 changes: 7 additions & 7 deletions fio/mmap_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestNewMMapIOManager(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -17,7 +17,7 @@ func TestNewMMapIOManager(t *testing.T) {

func TestMMapIO_Write(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)
assert.Nil(t, err)
assert.NotNil(t, mio)
Expand All @@ -36,7 +36,7 @@ func TestMMapIO_Write(t *testing.T) {

func TestMMapIO_Read(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -60,7 +60,7 @@ func TestMMapIO_Read(t *testing.T) {

func TestMMapIO_Sync(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -73,7 +73,7 @@ func TestMMapIO_Sync(t *testing.T) {

func TestMMapIO_Close(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -88,7 +88,7 @@ func TestMMapIO_Close(t *testing.T) {

func TestMMapIO_Write_Speed(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
assert.Nil(t, err)
assert.NotNil(t, mio)

Expand All @@ -104,7 +104,7 @@ func TestMMapIO_Write_Speed(t *testing.T) {

func TestMMapIO_Read_Speed(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)
assert.Nil(t, err)
assert.NotNil(t, mio)
Expand Down
Loading