Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make performance of TPCH q15 stable (#4570) #4710

Merged
5 changes: 5 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ void disableThreshold()
MEMORY_TRACER_SUBMIT_THRESHOLD = 0;
}

Int64 getLocalDeltaMemory()
{
return local_delta;
}

void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extern thread_local MemoryTracker * current_memory_tracker;
namespace CurrentMemoryTracker
{
void disableThreshold();
Int64 getLocalDeltaMemory();
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
parent.file_provider,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory,
parent.no_more_keys);

parent.threads_data[thread_num].src_rows += block.rows();
Expand Down Expand Up @@ -247,6 +248,7 @@ void ParallelAggregatingBlockInputStream::execute()
file_provider,
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory,
no_more_keys);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
71 changes: 43 additions & 28 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ void AggregatedDataVariants::convertToTwoLevel()

switch (type)
{
#define M(NAME) \
case Type::NAME: \
NAME##_two_level = std::make_unique<decltype(NAME##_two_level)::element_type>(*NAME); \
NAME.reset(); \
type = Type::NAME##_two_level; \
#define M(NAME) \
case Type::NAME: \
NAME##_two_level = std::make_unique<decltype(NAME##_two_level)::element_type>(*(NAME)); \
(NAME).reset(); \
type = Type::NAME##_two_level; \
break;

APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
Expand Down Expand Up @@ -452,7 +452,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
Arena * arena)
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
Expand Down Expand Up @@ -510,7 +510,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
}
}

bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
bool Aggregator::executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory,
bool & no_more_keys)
{
if (isCancelled())
return true;
Expand Down Expand Up @@ -578,17 +585,23 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, result.collators, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}

size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
{
current_memory_usage = current_memory_tracker->get();
auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory();
auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory;
current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff);
local_delta_memory = updated_local_delta_memory;
}

auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.

Expand Down Expand Up @@ -641,7 +654,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out);

if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
Expand Down Expand Up @@ -790,14 +803,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();

if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys))
if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys))
break;
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys);
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys);

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
Expand Down Expand Up @@ -1079,9 +1092,9 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -1096,9 +1109,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & dat
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) return prepareBlocksAndFillTwoLevelImpl(data_variants, *data_variants.NAME, final, thread_pool);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down Expand Up @@ -1497,7 +1510,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(data);
if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
Expand Down Expand Up @@ -1598,7 +1611,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Arena * arena = merged_data.aggregates_pools.at(thread_number).get();

if (false) {}
if (false) {} // NOLINT
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
Expand Down Expand Up @@ -1958,7 +1971,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);

if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
Expand All @@ -1982,7 +1995,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
result.aggregates_pools.push_back(std::make_shared<Arena>());
Arena * aggregates_pool = result.aggregates_pools.back().get();

auto task = std::bind(merge_bucket, bucket, aggregates_pool);
auto task = [&merge_bucket, bucket, aggregates_pool] {
return merge_bucket(bucket, aggregates_pool);
};

if (thread_pool)
thread_pool->schedule(ThreadFactory().newJob(task));
Expand Down Expand Up @@ -2198,9 +2213,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
type \
= AggregatedDataVariants::Type::NAME##_two_level;

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -2214,9 +2229,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
num_buckets \
= data.NAME->data.NUM_BUCKETS;

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -2227,9 +2242,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, key_columns, block, splitted_blocks);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down Expand Up @@ -2286,9 +2301,9 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
else if (result.type == AggregatedDataVariants::Type::NAME) \
destroyImpl<decltype(result.NAME)::element_type>(result.NAME->data);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ class Aggregator
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
Int64 local_delta_memory = 0;

/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
Expand Down Expand Up @@ -786,8 +787,14 @@ class Aggregator
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;

/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
Int64 & local_delta_memory,
bool & no_more_keys);

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
Expand Down Expand Up @@ -893,6 +900,8 @@ class Aggregator
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;

std::atomic<Int64> local_memory_usage = 0;

std::mutex mutex;

const LogWithPrefixPtr log;
Expand Down Expand Up @@ -939,11 +948,11 @@ class Aggregator
AggregateDataPtr overflow_row) const;

/// For case when there are no keys (all aggregate into one row).
void executeWithoutKeyImpl(
static void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
Arena * arena);

template <typename Method>
void writeToTemporaryFileImpl(
Expand Down