-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bound total memory used by HiveSplitSource #9119
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Travis failures
CompletableFuture<List<ConnectorSplit>> future = queue.getBatchAsync(maxSize); | ||
CompletableFuture<List<ConnectorSplit>> future = queue.getBatchAsync(maxSize).thenApply(internalSplits -> { | ||
ImmutableList.Builder<ConnectorSplit> result = ImmutableList.builder(); | ||
for (InternalHiveSplit internalSplit : internalSplits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next commit introduces side effect to the for-loop
public final class HiveTypeName | ||
{ | ||
private static final int INSTANCE_SIZE = ClassLayout.parseClass(HivePartitionKey.class).instanceSize() + | ||
ClassLayout.parseClass(String.class).instanceSize() * 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why times two? There is only one String in the class.
this.value = requireNonNull(value, "value is null"); | ||
} | ||
|
||
public String toString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing @Override
@@ -264,7 +264,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt | |||
.collect(joining(","))); | |||
schema.setProperty(META_TABLE_COLUMN_TYPES, dataColumns.stream() | |||
.map(DataColumn::getHiveType) | |||
.map(HiveType::getHiveTypeName) | |||
.map(hiveType -> hiveType.getHiveTypeName().toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just add
.map(Object::toString)
@@ -463,7 +462,7 @@ private static Properties createSchema(HiveStorageFormat format, List<String> co | |||
.collect(joining(","))); | |||
schema.setProperty(META_TABLE_COLUMN_TYPES, columnTypes.stream() | |||
.map(type -> toHiveType(typeTranslator, type)) | |||
.map(HiveType::getHiveTypeName) | |||
.map(hiveType -> hiveType.getHiveTypeName().toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(Object::toString)
@@ -51,6 +55,11 @@ public String getValue() | |||
return value; | |||
} | |||
|
|||
public int getEstimatedSizeInBytes() | |||
{ | |||
return INSTANCE_SIZE + name.length() * Character.BYTES + value.length() * Character.BYTES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is long and hard to read, add parentheses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree on this one. I find it harder to read with parenthesis.
I will address all other comments. I noticed the build failure and fixed them myself. Apparently, I forgot to push it.
@@ -62,6 +73,7 @@ | |||
this.tableName = requireNonNull(tableName, "tableName is null"); | |||
this.compactEffectivePredicate = requireNonNull(compactEffectivePredicate, "compactEffectivePredicate is null"); | |||
this.queue = new AsyncQueue<>(maxOutstandingSplits, executor); | |||
this.maxOutstandingSplitsBytes = Ints.checkedCast(maxOutstandingSplitsSize.toBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use toIntExact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Ints.saturatedCast()
might be better here. It means the memory limit will be lower than the very large limit that was specified.
@@ -84,9 +96,16 @@ int getOutstandingSplitCount() | |||
CompletableFuture<?> addToQueue(InternalHiveSplit split) | |||
{ | |||
if (throwable.get() == null) { | |||
if (estimatedSplitSizeInBytes.addAndGet(split.getEstimatedSizeInBytes()) > maxOutstandingSplitsBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can overflow if the limit is close to MAX_INT, and/or if there are many threads incrementing at once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably make this a long, just to be safe
// This limit should never be hit given there is a limit of maxOutstandingSplits. | ||
// If it's hit, it means individual splits are huge. | ||
throw new PrestoException(GENERIC_INTERNAL_ERROR, format( | ||
"Split buffering for %s.%s takes too much memory (%s bytes limit). %s splits are buffered.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format(
"Split buffering for %s.%s exceeded memory limit (%s). %s splits are buffered.",
..., succinctBytes(maxOutstandingSplitsBytes), ...)
924793b
to
51fc023
Compare
InternalHiveSplit avoids unnecessary duplicate information, and is more friendly to memory accounting. It is used for buffering discovered splits inside Hive connector.
No description provided.