Skip to content

Commit

Permalink
Adopt spill-friendly order of input columns in TopNRowNumber operator
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Oct 20, 2023
1 parent 7acc337 commit 4f6d0f4
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 73 deletions.
4 changes: 4 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,10 @@ class TopNRowNumberNode : public PlanNode {
return outputType_;
}

const RowTypePtr& inputType() const {
return sources_[0]->outputType();
}

const std::vector<FieldAccessTypedExprPtr>& partitionKeys() const {
return partitionKeys_;
}
Expand Down
90 changes: 70 additions & 20 deletions velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,60 @@

namespace facebook::velox::exec {

namespace {

std::vector<column_index_t> reorderInputChannels(
const RowTypePtr& inputType,
const std::vector<core::FieldAccessTypedExprPtr>& partitionKeys,
const std::vector<core::FieldAccessTypedExprPtr>& sortingKeys) {
const auto size = inputType->size();

std::vector<column_index_t> channels;
channels.reserve(size);

std::unordered_set<std::string> keyNames;

for (const auto& key : partitionKeys) {
channels.push_back(exprToChannel(key.get(), inputType));
keyNames.insert(key->name());
}

for (const auto& key : sortingKeys) {
channels.push_back(exprToChannel(key.get(), inputType));
keyNames.insert(key->name());
}

for (auto i = 0; i < size; ++i) {
if (keyNames.count(inputType->nameOf(i)) == 0) {
channels.push_back(i);
}
}

return channels;
}

RowTypePtr reorderInputType(
const RowTypePtr& inputType,
const std::vector<column_index_t>& channels) {
const auto size = inputType->size();

VELOX_CHECK_EQ(size, channels.size());

std::vector<std::string> names;
names.reserve(size);

std::vector<TypePtr> types;
types.reserve(size);

for (auto channel : channels) {
names.push_back(inputType->nameOf(channel));
types.push_back(inputType->childAt(channel));
}

return ROW(std::move(names), std::move(types));
}
} // namespace

TopNRowNumber::TopNRowNumber(
int32_t operatorId,
DriverCtx* driverCtx,
Expand All @@ -29,14 +83,18 @@ TopNRowNumber::TopNRowNumber(
"TopNRowNumber"),
limit_{node->limit()},
generateRowNumber_{node->generateRowNumber()},
inputType_{node->sources()[0]->outputType()},
inputChannels_{reorderInputChannels(
node->inputType(),
node->partitionKeys(),
node->sortingKeys())},
inputType_{reorderInputType(node->inputType(), inputChannels_)},
data_(std::make_unique<RowContainer>(inputType_->children(), pool())),
comparator_(
inputType_,
node->sortingKeys(),
node->sortingOrders(),
data_.get()),
decodedVectors_(inputType_->children().size()) {
decodedVectors_(inputType_->size()) {
const auto& keys = node->partitionKeys();
const auto numKeys = keys.size();

Expand All @@ -45,7 +103,7 @@ TopNRowNumber::TopNRowNumber(
true, sizeof(TopRows), false, 1, [](auto, auto) {}, [](auto) {}};

table_ = std::make_unique<HashTable<false>>(
createVectorHashers(inputType_, keys),
createVectorHashers(node->inputType(), keys),
std::vector<Accumulator>{accumulator},
std::vector<TypePtr>{},
false, // allowDuplicates
Expand All @@ -60,22 +118,16 @@ TopNRowNumber::TopNRowNumber(
singlePartition_ = std::make_unique<TopRows>(allocator_.get(), comparator_);
}

identityProjections_.reserve(inputType_->size());
for (auto i = 0; i < inputType_->size(); ++i) {
identityProjections_.emplace_back(i, i);
}

if (generateRowNumber_) {
resultProjections_.emplace_back(0, inputType_->size());
results_.resize(1);
}
}

void TopNRowNumber::addInput(RowVectorPtr input) {
const auto numInput = input->size();

for (auto i = 0; i < inputType_->size(); ++i) {
decodedVectors_[i].decode(*input->childAt(i));
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

if (table_) {
Expand All @@ -91,11 +143,11 @@ void TopNRowNumber::addInput(RowVectorPtr input) {
// if row should replace an existing row or be discarded.
for (auto i = 0; i < numInput; ++i) {
auto& partition = partitionAt(lookup_->hits[i]);
processInputRow(input, i, partition);
processInputRow(i, partition);
}
} else {
for (auto i = 0; i < numInput; ++i) {
processInputRow(input, i, *singlePartition_);
processInputRow(i, *singlePartition_);
}
}
}
Expand All @@ -107,10 +159,7 @@ void TopNRowNumber::initializeNewPartitions() {
}
}

void TopNRowNumber::processInputRow(
const RowVectorPtr& input,
vector_size_t index,
TopRows& partition) {
void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) {
auto& topRows = partition.rows;

char* newRow = nullptr;
Expand All @@ -131,7 +180,7 @@ void TopNRowNumber::processInputRow(
newRow = data_->initializeRow(topRow, true /* reuse */);
}

for (auto col = 0; col < input->childrenSize(); ++col) {
for (auto col = 0; col < decodedVectors_.size(); ++col) {
data_->store(decodedVectors_[col], index, newRow, col);
}

Expand Down Expand Up @@ -268,8 +317,9 @@ RowVectorPtr TopNRowNumber::getOutput() {
}
output->resize(offset);

for (int i = 0; i < inputType_->size(); ++i) {
data_->extractColumn(outputRows_.data(), offset, i, output->childAt(i));
for (int i = 0; i < inputChannels_.size(); ++i) {
data_->extractColumn(
outputRows_.data(), offset, i, output->childAt(inputChannels_[i]));
}

return output;
Expand Down
44 changes: 24 additions & 20 deletions velox/exec/TopNRowNumber.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class TopNRowNumber : public Operator {
void close() override;

private:
/// A priority queue to keep track of top 'limit' rows for a given partition.
// A priority queue to keep track of top 'limit' rows for a given partition.
struct TopRows {
struct Compare {
RowComparator& comparator;
Expand All @@ -83,21 +83,18 @@ class TopNRowNumber : public Operator {
return *reinterpret_cast<TopRows*>(group + partitionOffset_);
}

/// Adds input row to a partition or discards the row.
void processInputRow(
const RowVectorPtr& input,
vector_size_t index,
TopRows& partition);
// Adds input row to a partition or discards the row.
void processInputRow(vector_size_t index, TopRows& partition);

/// Returns next partition to add to output or nullptr if there are no
/// partitions left.
// Returns next partition to add to output or nullptr if there are no
// partitions left.
TopRows* nextPartition();

/// Returns partition that was partially added to the previous output batch.
// Returns partition that was partially added to the previous output batch.
TopRows& currentPartition();

/// Appends partition rows to outputRows_ and optionally populates row
/// numbers.
// Appends partition rows to outputRows_ and optionally populates row
// numbers.
void appendPartitionRows(
TopRows& partition,
vector_size_t start,
Expand All @@ -107,21 +104,28 @@ class TopNRowNumber : public Operator {

const int32_t limit_;
const bool generateRowNumber_;

// Input columns in the order of: partition keys, sorting keys, the rest.
const std::vector<column_index_t> inputChannels_;

// Input column types in 'inputChannels_' order.
const RowTypePtr inputType_;

/// Hash table to keep track of partitions. Not used if there are no
/// partitioning keys. For each partition, stores an instance of TopRows
/// struct.
// Hash table to keep track of partitions. Not used if there are no
// partitioning keys. For each partition, stores an instance of TopRows
// struct.
std::unique_ptr<BaseHashTable> table_;
std::unique_ptr<HashLookup> lookup_;
int32_t partitionOffset_;

/// TopRows struct to keep track of top rows for a single partition, when
/// there are no partitioning keys.
// TopRows struct to keep track of top rows for a single partition, when
// there are no partitioning keys.
std::unique_ptr<HashStringAllocator> allocator_;
std::unique_ptr<TopRows> singlePartition_;

/// Stores row data. For each partition, only up to 'limit' rows are stored.
// Stores input data. For each partition, only up to 'limit_' rows are stored.
// Order of columns matches 'inputChannels_': partition keys, sorting keys,
// the rest.
std::unique_ptr<RowContainer> data_;

RowComparator comparator_;
Expand All @@ -130,12 +134,12 @@ class TopNRowNumber : public Operator {

bool finished_{false};

/// Maximum number of rows in the output batch.
// Maximum number of rows in the output batch.
vector_size_t outputBatchSize_;
std::vector<char*> outputRows_;

/// Number of partitions to fetch from a HashTable in a single listAllRows
/// call.
// Number of partitions to fetch from a HashTable in a single listAllRows
// call.
static const size_t kPartitionBatchSize = 100;

BaseHashTable::RowsIterator partitionIt_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/RowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ TEST_F(RowNumberTest, spill) {
test(1);
test(100);
test(1'000);
} // namespace facebook::velox::exec::test
}

TEST_F(RowNumberTest, basic) {
auto data = makeRowVector({
Expand Down
78 changes: 46 additions & 32 deletions velox/exec/tests/TopNRowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,41 +84,50 @@ TEST_F(TopNRowNumberTest, basic) {
}

TEST_F(TopNRowNumberTest, largeOutput) {
// Make 10 vectors. Use different types for partitioning key, sorting key and
// data. Use order of columns different from partitioning keys, followed by
// sorting keys, followed by data.
const vector_size_t size = 10'000;
auto data = makeRowVector({
// Partitioning key.
makeFlatVector<int64_t>(size, [](auto row) { return row % 7; }),
// Sorting key.
makeFlatVector<int64_t>(size, [](auto row) { return (size - row) * 10; }),
// Data.
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
});
auto data = split(
makeRowVector(
{"d", "p", "s"},
{
// Data.
makeFlatVector<float>(size, [](auto row) { return row; }),
// Partitioning key.
makeFlatVector<int16_t>(size, [](auto row) { return row % 7; }),
// Sorting key.
makeFlatVector<int32_t>(
size, [](auto row) { return (size - row) * 10; }),
}),
10);

createDuckDbTable({data});
createDuckDbTable(data);

auto testLimit = [&](auto limit) {
SCOPED_TRACE(fmt::format("Limit: {}", limit));
auto plan = PlanBuilder()
.values({data})
.topNRowNumber({"c0"}, {"c1"}, limit, true)
.values(data)
.topNRowNumber({"p"}, {"s"}, limit, true)
.planNode();

AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kPreferredOutputBatchBytes, "1024")
.assertResults(fmt::format(
"SELECT * FROM (SELECT *, row_number() over (partition by c0 order by c1) as rn FROM tmp) "
"SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) "
" WHERE rn <= {}",
limit));

// No partitioning keys.
plan = PlanBuilder()
.values({data})
.topNRowNumber({}, {"c1"}, limit, true)
.values(data)
.topNRowNumber({}, {"s"}, limit, true)
.planNode();

AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kPreferredOutputBatchBytes, "1024")
.assertResults(fmt::format(
"SELECT * FROM (SELECT *, row_number() over (order by c1) as rn FROM tmp) "
"SELECT * FROM (SELECT *, row_number() over (order by s) as rn FROM tmp) "
" WHERE rn <= {}",
limit));
};
Expand All @@ -132,32 +141,37 @@ TEST_F(TopNRowNumberTest, largeOutput) {

TEST_F(TopNRowNumberTest, manyPartitions) {
const vector_size_t size = 10'000;
auto data = makeRowVector({
// Partitioning key.
makeFlatVector<int64_t>(
size, [](auto row) { return row / 2; }, nullEvery(7)),
// Sorting key.
makeFlatVector<int64_t>(
size,
[](auto row) { return (size - row) * 10; },
[](auto row) { return row == 123; }),
// Data.
makeFlatVector<int64_t>(
size, [](auto row) { return row; }, nullEvery(11)),
});
auto data = split(
makeRowVector(
{"d", "s", "p"},
{
// Data.
makeFlatVector<int64_t>(
size, [](auto row) { return row; }, nullEvery(11)),
// Sorting key.
makeFlatVector<int64_t>(
size,
[](auto row) { return (size - row) * 10; },
[](auto row) { return row == 123; }),
// Partitioning key.
makeFlatVector<int64_t>(
size, [](auto row) { return row / 2; }, nullEvery(7)),
}),
10);

createDuckDbTable({data});
createDuckDbTable(data);

auto testLimit = [&](auto limit) {
SCOPED_TRACE(fmt::format("Limit: {}", limit));
auto plan = PlanBuilder()
.values({data})
.topNRowNumber({"c0"}, {"c1"}, limit, true)
.values(data)
.topNRowNumber({"p"}, {"s"}, limit, true)
.planNode();

assertQuery(
plan,
fmt::format(
"SELECT * FROM (SELECT *, row_number() over (partition by c0 order by c1) as rn FROM tmp) "
"SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) "
" WHERE rn <= {}",
limit));
};
Expand Down

0 comments on commit 4f6d0f4

Please sign in to comment.