diff --git a/fs/fs.go b/fs/fs.go index c78882fc78..5e57a98390 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -19,8 +19,9 @@ package fs import ( - "bufio" + "bytes" "fmt" + "io" "os" "os/exec" "path" @@ -57,6 +58,9 @@ const ( // A pool for restricting the number of consecutive `du` and `find` tasks running. var pool = make(chan struct{}, maxConcurrentOps) +// bufSize control size of read buffer in getDiskStatsMap() +var bufSize = 5000 + func init() { for i := 0; i < maxConcurrentOps; i++ { releaseToken() @@ -459,67 +463,100 @@ func getDiskStatsMap(diskStatsFile string) (map[string]DiskStats, error) { } defer file.Close() - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() - words := strings.Fields(line) - if !partitionRegex.MatchString(words[2]) { - continue + leftover := []byte{} + const newline = 10 + for { + buf := make([]byte, bufSize) + count, err := file.Read(buf) + buf = append(leftover, buf...) + klog.V(10).Infof("read %d bytes in getDiskStatsMap()", count) + if err == io.EOF { + break + } + lines := bytes.Split(buf, []byte{newline}) + var skipLastLine bool + if buf[len(buf)-1] != newline { + skipLastLine = true } - // 8 50 sdd2 40 0 280 223 7 0 22 108 0 330 330 - deviceName := path.Join("/dev", words[2]) - var err error - devInfo := make([]uint64, 2) - for i := 0; i < len(devInfo); i++ { - devInfo[i], err = strconv.ParseUint(words[i], 10, 64) + for i, bLine := range lines { + if len(bLine) == 0 { + klog.V(10).Infof("skipping empty line in getDiskStatsMap() iteration %d", i) + continue + } + if skipLastLine && i == len(lines)-1 { + klog.V(10).Infof("skipping leftover line in getDiskStatsMap() iteration %d", i) + continue + } + line := string(bLine) + words := strings.Fields(line) + + if !partitionRegex.MatchString(words[2]) { + continue + } + // 8 50 sdd2 40 0 280 223 7 0 22 108 0 330 330 + deviceName := path.Join("/dev", words[2]) + + var err error + devInfo := make([]uint64, 2) + for i := 0; i < len(devInfo); i++ { + devInfo[i], err = strconv.ParseUint(words[i], 10, 64) + if err != nil { + return nil, err + } + } + + wordLength := len(words) + offset := 3 + var stats = make([]uint64, wordLength-offset) + if len(stats) < 11 { + return nil, fmt.Errorf("could not parse all 11 columns of /proc/diskstats") + } + for i := offset; i < wordLength; i++ { + stats[i-offset], err = strconv.ParseUint(words[i], 10, 64) + if err != nil { + return nil, err + } + } + + major64, err := strconv.ParseUint(words[0], 10, 64) if err != nil { return nil, err } - } - wordLength := len(words) - offset := 3 - var stats = make([]uint64, wordLength-offset) - if len(stats) < 11 { - return nil, fmt.Errorf("could not parse all 11 columns of /proc/diskstats") - } - for i := offset; i < wordLength; i++ { - stats[i-offset], err = strconv.ParseUint(words[i], 10, 64) + minor64, err := strconv.ParseUint(words[1], 10, 64) if err != nil { return nil, err } + + diskStats := DiskStats{ + MajorNum: devInfo[0], + MinorNum: devInfo[1], + ReadsCompleted: stats[0], + ReadsMerged: stats[1], + SectorsRead: stats[2], + ReadTime: stats[3], + WritesCompleted: stats[4], + WritesMerged: stats[5], + SectorsWritten: stats[6], + WriteTime: stats[7], + IoInProgress: stats[8], + IoTime: stats[9], + WeightedIoTime: stats[10], + Major: major64, + Minor: minor64, + } + diskStatsMap[deviceName] = diskStats } - major64, err := strconv.ParseUint(words[0], 10, 64) - if err != nil { - return nil, err + if buf[len(buf)-1] != newline { + leftover = lines[len(lines)-1] + klog.V(10).Info("saving leftover line %q", leftover) + } else { + leftover = []byte{} } - minor64, err := strconv.ParseUint(words[1], 10, 64) - if err != nil { - return nil, err - } - - diskStats := DiskStats{ - MajorNum: devInfo[0], - MinorNum: devInfo[1], - ReadsCompleted: stats[0], - ReadsMerged: stats[1], - SectorsRead: stats[2], - ReadTime: stats[3], - WritesCompleted: stats[4], - WritesMerged: stats[5], - SectorsWritten: stats[6], - WriteTime: stats[7], - IoInProgress: stats[8], - IoTime: stats[9], - WeightedIoTime: stats[10], - Major: major64, - Minor: minor64, - } - diskStatsMap[deviceName] = diskStats } return diskStatsMap, nil } diff --git a/fs/fs_test.go b/fs/fs_test.go index 51a1246e33..b509b7e4d3 100644 --- a/fs/fs_test.go +++ b/fs/fs_test.go @@ -40,6 +40,9 @@ func TestMountInfoFromDir(t *testing.T) { } func TestGetDiskStatsMap(t *testing.T) { + origBufSize := bufSize + bufSize = 100 + defer func() { bufSize = origBufSize }() diskStatsMap, err := getDiskStatsMap("test_resources/diskstats") if err != nil { t.Errorf("Error calling getDiskStatsMap %s", err) @@ -92,6 +95,9 @@ func TestGetDiskStatsMap(t *testing.T) { } func TestGetDiskStatsMapMajorMinorNum(t *testing.T) { + origBufSize := bufSize + bufSize = 100 + defer func() { bufSize = origBufSize }() diskStatsMap, err := getDiskStatsMap("test_resources/diskstats") if err != nil { t.Errorf("Error calling getDiskStatsMap %s", err) @@ -108,6 +114,9 @@ func TestGetDiskStatsMapMajorMinorNum(t *testing.T) { } func TestFileNotExist(t *testing.T) { + origBufSize := bufSize + bufSize = 100 + defer func() { bufSize = origBufSize }() _, err := getDiskStatsMap("/file_does_not_exist") if err != nil { t.Fatalf("getDiskStatsMap must not error for absent file: %s", err)