Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: optimize MVCCExportToSST range key processing #86521

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 57 additions & 66 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5762,7 +5762,6 @@ func MVCCExportToSST(
var curKey roachpb.Key // only used if exportAllRevisions
var resumeKey MVCCKey
var rangeKeys MVCCRangeKeyStack
var rangeKeysEnd roachpb.Key
var rangeKeysSize int64

// maxRangeKeysSizeIfTruncated calculates the worst-case size of the currently
Expand Down Expand Up @@ -5790,7 +5789,8 @@ func MVCCExportToSST(
// We could be truncated in the middle of a point key version series, which
// would require adding on a \0 byte via Key.Next(), so let's assume that.
maxSize := rangeKeysSize
if s := maxSize + int64(rangeKeys.Len()*(len(resumeKey)-len(rangeKeysEnd)+1)); s > maxSize {
endKeySize := len(rangeKeys.Bounds.EndKey)
if s := maxSize + int64(rangeKeys.Len()*(len(resumeKey)-endKeySize+1)); s > maxSize {
maxSize = s
}
return maxSize
Expand Down Expand Up @@ -5845,14 +5845,8 @@ func MVCCExportToSST(
// limit). If we return a resume key then we need to truncate the final
// range key stack (and thus the SST) to the resume key, so we can't flush
// them until we've moved past.
hasPoint, hasRange := iter.HasPointAndRange()

// First, flush any pending range tombstones when we reach their end key.
//
// TODO(erikgrinaker): Optimize the range key case here, to avoid these
// comparisons. Pebble should expose an API to cheaply detect range key
// changes.
if len(rangeKeysEnd) > 0 && bytes.Compare(unsafeKey.Key, rangeKeysEnd) >= 0 {
if iter.RangeKeyChanged() {
// Flush any pending range tombstones.
for _, v := range rangeKeys.Versions {
mvccValue, ok, err := tryDecodeSimpleMVCCValue(v.Value)
if !ok && err == nil {
Expand All @@ -5863,66 +5857,64 @@ func MVCCExportToSST(
"decoding mvcc value %s", v.Value)
}
// Export only the inner roachpb.Value, not the MVCCValue header.
mvccValue = MVCCValue{Value: mvccValue.Value}
if err := sstWriter.PutMVCCRangeKey(rangeKeys.AsRangeKey(v), mvccValue); err != nil {
rawValue := mvccValue.Value.RawBytes
if err := sstWriter.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
}
rows.BulkOpSummary.DataSize += rangeKeysSize
rangeKeys, rangeKeysEnd, rangeKeysSize = MVCCRangeKeyStack{}, nil, 0
}
rangeKeys.Clear()
rangeKeysSize = 0

// If we find any new range keys and we haven't buffered any range keys yet,
// buffer them.
if hasRange && !skipTombstones && rangeKeys.IsEmpty() {
rangeKeys = iter.RangeKeys()
rangeKeysEnd = append(rangeKeysEnd[:0], rangeKeys.Bounds.EndKey...)
if !opts.ExportAllRevisions {
rangeKeys.Versions = rangeKeys.Versions[:1]
}
// Buffer any new range keys.
hasPoint, hasRange := iter.HasPointAndRange()
if hasRange && !skipTombstones {
if opts.ExportAllRevisions {
iter.RangeKeys().CloneInto(&rangeKeys)
} else {
rks := iter.RangeKeys()
rks.Versions = rks.Versions[:1]
rks.CloneInto(&rangeKeys)
}

// TODO(erikgrinaker): We should consider a CloneInto() method on the
// MVCCRangeKeyStack that allows reusing a byte buffer. See also TODO in
// Clone() about using a single allocation for the entire clone (all byte
// slices).
rangeKeys = rangeKeys.Clone()
for _, v := range rangeKeys.Versions {
rangeKeysSize += int64(
len(rangeKeys.Bounds.Key) + len(rangeKeys.Bounds.EndKey) + len(v.Value))
}

for _, v := range rangeKeys.Versions {
rangeKeysSize += int64(
len(rangeKeys.Bounds.Key) + len(rangeKeys.Bounds.EndKey) + len(v.Value))
// Check if the range keys exceed a limit, using similar logic as point
// keys. We have to check both the size of the range keys as they are (in
// case we emit them as-is), and the size of the range keys if they were
// to be truncated at the start key due to a resume span (which could
// happen if the next point key exceeds the max size).
//
// TODO(erikgrinaker): The limit logic here is a bit of a mess, but we're
// complying with the existing point key logic for now. We should get rid
// of some of the options and clean this up.
curSize := rows.BulkOpSummary.DataSize
reachedTargetSize := opts.TargetSize > 0 && uint64(curSize) >= opts.TargetSize
newSize := curSize + maxRangeKeysSizeIfTruncated(rangeKeys.Bounds.Key)
reachedMaxSize := opts.MaxSize > 0 && newSize > int64(opts.MaxSize)
if paginated && (reachedTargetSize || reachedMaxSize) {
rangeKeys.Clear()
rangeKeysSize = 0
resumeKey = unsafeKey.Clone()
break
}
if reachedMaxSize {
return roachpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{
reached: newSize, maxSize: opts.MaxSize}
}
}

// Check if the range keys exceed a limit, using similar logic as point
// keys. We have to check both the size of the range keys as they are (in
// case we emit them as-is), and the size of the range keys if they were
// to be truncated at the start key due to a resume span (which could
// happen if the next point key exceeds the max size).
//
// TODO(erikgrinaker): The limit logic here is a bit of a mess, but we're
// complying with the existing point key logic for now. We should get rid
// of some of the options and clean this up.
curSize := rows.BulkOpSummary.DataSize
reachedTargetSize := opts.TargetSize > 0 && uint64(curSize) >= opts.TargetSize
newSize := curSize + maxRangeKeysSizeIfTruncated(rangeKeys.Bounds.Key)
reachedMaxSize := opts.MaxSize > 0 && newSize > int64(opts.MaxSize)
if paginated && (reachedTargetSize || reachedMaxSize) {
rangeKeys, rangeKeysEnd, rangeKeysSize = MVCCRangeKeyStack{}, nil, 0
resumeKey = unsafeKey.Clone()
break
}
if reachedMaxSize {
return roachpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{
reached: newSize, maxSize: opts.MaxSize}
// If we're on a bare range key, step forward. We can't use NextKey()
// because there may be a point key at the range key's start bound.
if !hasPoint {
iter.Next()
continue
}
}

// If we're on a bare range key, step forward. We can't use NextKey()
// because there may be a point key version at the range key's start bound.
if !hasPoint {
iter.Next()
continue
}

// Process point keys.
unsafeValue := iter.UnsafeValue()
skip := false
Expand Down Expand Up @@ -6020,15 +6012,14 @@ func MVCCExportToSST(
// which overlaps at [c-c\0).
if !rangeKeys.IsEmpty() {
// Calculate the new rangeKeysSize due to the new resume bounds.
if len(resumeKey.Key) > 0 && rangeKeysEnd.Compare(resumeKey.Key) > 0 {
oldEndLen := len(rangeKeysEnd)
rangeKeysEnd = resumeKey.Key
if len(resumeKey.Key) > 0 && rangeKeys.Bounds.EndKey.Compare(resumeKey.Key) > 0 {
oldEndLen := len(rangeKeys.Bounds.EndKey)
rangeKeys.Bounds.EndKey = resumeKey.Key
if resumeKey.Timestamp.IsSet() {
rangeKeysEnd = rangeKeysEnd.Next()
rangeKeys.Bounds.EndKey = rangeKeys.Bounds.EndKey.Next()
}
rangeKeysSize += int64(rangeKeys.Len() * (len(rangeKeysEnd) - oldEndLen))
rangeKeysSize += int64(rangeKeys.Len() * (len(rangeKeys.Bounds.EndKey) - oldEndLen))
}
rangeKeys.Bounds.EndKey = rangeKeysEnd
for _, v := range rangeKeys.Versions {
mvccValue, ok, err := tryDecodeSimpleMVCCValue(v.Value)
if !ok && err == nil {
Expand All @@ -6039,8 +6030,8 @@ func MVCCExportToSST(
"decoding mvcc value %s", v.Value)
}
// Export only the inner roachpb.Value, not the MVCCValue header.
mvccValue = MVCCValue{Value: mvccValue.Value}
if err := sstWriter.PutMVCCRangeKey(rangeKeys.AsRangeKey(v), mvccValue); err != nil {
rawValue := mvccValue.Value.RawBytes
if err := sstWriter.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
}
Expand Down