Skip to content

Commit

Permalink
Make performance of TPCH q15 stable (#4570) (#4707)
Browse files Browse the repository at this point in the history
close #4451
  • Loading branch information
ti-chi-bot authored Apr 22, 2022
1 parent cf99103 commit 0053753
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 32 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ void disableThreshold()
MEMORY_TRACER_SUBMIT_THRESHOLD = 0;
}

Int64 getLocalDeltaMemory()
{
return local_delta;
}

void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extern thread_local MemoryTracker * current_memory_tracker;
namespace CurrentMemoryTracker
{
void disableThreshold();
Int64 getLocalDeltaMemory();
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
parent.file_provider,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory,
parent.no_more_keys);

parent.threads_data[thread_num].src_rows += block.rows();
Expand Down Expand Up @@ -243,6 +244,7 @@ void ParallelAggregatingBlockInputStream::execute()
file_provider,
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory,
no_more_keys);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
71 changes: 43 additions & 28 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ void AggregatedDataVariants::convertToTwoLevel()

switch (type)
{
#define M(NAME) \
case Type::NAME: \
NAME##_two_level = std::make_unique<decltype(NAME##_two_level)::element_type>(*NAME); \
NAME.reset(); \
type = Type::NAME##_two_level; \
#define M(NAME) \
case Type::NAME: \
NAME##_two_level = std::make_unique<decltype(NAME##_two_level)::element_type>(*(NAME)); \
(NAME).reset(); \
type = Type::NAME##_two_level; \
break;

APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
Expand Down Expand Up @@ -453,7 +453,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
Arena * arena)
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
Expand Down Expand Up @@ -511,7 +511,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
}
}

bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
bool Aggregator::executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory,
bool & no_more_keys)
{
if (isCancelled())
return true;
Expand Down Expand Up @@ -579,17 +586,23 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, result.collators, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}

size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
{
current_memory_usage = current_memory_tracker->get();
auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory();
auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory;
current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff);
local_delta_memory = updated_local_delta_memory;
}

auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.

Expand Down Expand Up @@ -642,7 +655,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out);

if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
Expand Down Expand Up @@ -804,14 +817,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();

if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys))
if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys))
break;
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys);
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys);

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
Expand Down Expand Up @@ -1099,9 +1112,9 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -1116,9 +1129,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & dat
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) return prepareBlocksAndFillTwoLevelImpl(data_variants, *data_variants.NAME, final, thread_pool);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down Expand Up @@ -1522,7 +1535,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(data);
if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
Expand Down Expand Up @@ -1622,7 +1635,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Arena * arena = merged_data.aggregates_pools.at(thread_number).get();

if (false) {}
if (false) {} // NOLINT
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
Expand Down Expand Up @@ -1982,7 +1995,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);

if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
Expand All @@ -2006,7 +2019,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
result.aggregates_pools.push_back(std::make_shared<Arena>());
Arena * aggregates_pool = result.aggregates_pools.back().get();

auto task = std::bind(merge_bucket, bucket, aggregates_pool);
auto task = [&merge_bucket, bucket, aggregates_pool] {
return merge_bucket(bucket, aggregates_pool);
};

if (thread_pool)
thread_pool->schedule(wrapInvocable(true, task));
Expand Down Expand Up @@ -2227,9 +2242,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
type \
= AggregatedDataVariants::Type::NAME##_two_level;

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -2243,9 +2258,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
num_buckets \
= data.NAME->data.NUM_BUCKETS;

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -2256,9 +2271,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, key_columns, block, splitted_blocks);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down Expand Up @@ -2315,9 +2330,9 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
else if (result.type == AggregatedDataVariants::Type::NAME) \
destroyImpl<decltype(result.NAME)::element_type>(result.NAME->data);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ class Aggregator
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
Int64 local_delta_memory = 0;

/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
Expand Down Expand Up @@ -786,8 +787,14 @@ class Aggregator
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;

/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
Int64 & local_delta_memory,
bool & no_more_keys);

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
Expand Down Expand Up @@ -893,6 +900,8 @@ class Aggregator
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;

std::atomic<Int64> local_memory_usage = 0;

std::mutex mutex;

const LogWithPrefixPtr log;
Expand Down Expand Up @@ -939,11 +948,11 @@ class Aggregator
AggregateDataPtr overflow_row) const;

/// For case when there are no keys (all aggregate into one row).
void executeWithoutKeyImpl(
static void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
Arena * arena);

template <typename Method>
void writeToTemporaryFileImpl(
Expand Down

0 comments on commit 0053753

Please sign in to comment.