Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bloom_filter_agg Spark aggregate function #4028

Closed

Conversation

jinchengchenghh
Copy link
Contributor

@jinchengchenghh jinchengchenghh commented Feb 13, 2023

This function is used in Spark Runtime Filters: apache/spark#35789

https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit#heading=h.4v65wq7vzy4q

BloomFilter implementation in Velox is different from Spark, hence, serialized BloomFilter is different.

Velox has memory limit for contiguous memory buffer, hence BloomFilter capacity is less than in Spark when numBits is large. See #4713 (comment)

Spark allows for changing the defaults while Velox does not.

See also #3342

Fixes #3694

@netlify
Copy link

netlify bot commented Feb 13, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 36b31e5
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/647ed270c4a3d00008e58334

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Feb 13, 2023
100, [](vector_size_t row) { return row % 9; })})};

auto expected = {makeRowVector({makeFlatVector<StringView>(
1, [](vector_size_t row) { return "\u0004"; })})};
Copy link

@XinShuoWang XinShuoWang Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding:

  1. The expected result is at least 36 bytes, not just "\u0004".
  2. You can use return StringView(pointer_to_data, data_size); instead of return "\u0004"; to avoid the risk of truncation.Enhance BloomFilter to serialize and memory track #3861 (comment)
  3. Please reconfirm the correctness of the expected result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I try to fix as you say, but failed by #4028 (comment).
Can you give me more suggestions?

@jinchengchenghh

This comment was marked as outdated.

@jinchengchenghh

This comment was marked as outdated.

@jinchengchenghh
Copy link
Contributor Author

Spark fuzzer test will raise the exception, can you help to fix this? @duanmeng

/mnt/DP_disk1/code/velox/build/velox/exec/tests# ./spark_aggregation_fuzzer_test
E0414 09:32:32.838361 1385171 Exceptions.h:68] Line: /mnt/DP_disk1/code/velox/velox/functions/sparksql/aggregates/BloomFilterAggAggregate.cpp:237, Function:setConstantArgument, Expression: vec.isConstantMapping() originalEstimatedNumItems argument must be constant for all input rows, Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: originalEstimatedNumItems argument must be constant for all input rows
Retriable: False
Expression: vec.isConstantMapping()
Function: setConstantArgument
File: /mnt/DP_disk1/code/velox/velox/functions/sparksql/aggregates/BloomFilterAggAggregate.cpp
Line: 237
Stack trace:
# 0
# 1

@jinchengchenghh
Copy link
Contributor Author

Can you help review this one? @mbasmanova Thanks!

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh

#4633 is fixed. Would you re-enable fuzzer test?

Curious how is this function used in Spark to reduce the amount of shuffle data. Is there something I can read about this?

@@ -35,9 +35,12 @@ target_link_libraries(
set_property(TARGET velox_functions_spark PROPERTY JOB_POOL_COMPILE
high_memory_pool)

if(${VELOX_ENABLE_AGGREGATES})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change? Let's remove. Feel free to open a separate PR with this change along if necessary.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Some comments.

bloomFilter_.insert(folly::hasher<int64_t>()(value));
}

BloomFilter<StlAllocator<uint64_t>> bloomFilter_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming: struct members do not have trailing underscore

explicit BloomFilterAccumulator(HashStringAllocator* allocator)
: bloomFilter_{StlAllocator<uint64_t>(allocator)} {}

int32_t serializedSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this method const

return bloomFilter_.serializedSize();
}

void serialize(char* output) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this method const

const std::vector<VectorPtr>& args,
bool /*mayPushdown*/) override {
decodeArguments(rows, args);
VELOX_USER_CHECK(!decodedRaw_.mayHaveNulls());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this check? Would you add a test case where some of the input data is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark bloomfilter aggregate test only tests the empty input. https://github.com/apache/spark/blob/branch-3.3/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala#L196
I add the test:

  test("Test that bloom_filter_agg errors null") {
    spark.sql(
      """
        |SELECT bloom_filter_agg(null)"""
        .stripMargin)
  }

It will throw exception:

[DATATYPE_MISMATCH.BLOOM_FILTER_WRONG_TYPE] Cannot resolve "bloom_filter_agg(NULL, 1000000, 8388608)" due to data type mismatch: Input to function `bloom_filter_agg` should have been "BINARY" followed by value with "BIGINT", but it's ["VOID", "BIGINT", "BIGINT"].; line 2 pos 7;

In spark, its first argument is xxhash(table_col), so it won't be null.

Velox BloomFilter accepts uint64_t while xxhash() returns int64_t, so we need to use folly to hash twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In spark, its first argument is xxhash(table_col), so it won't be null.

I think it is totally possible that table_col is null for some or all rows.

SELECT bloom_filter_agg(null)

Try changing this to something like this:

SELECT bloom_filter_agg(cast(null as varbinary))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change the test to

  test("Test that bloom_filter_agg errors null") {
    spark.sql(
      """
        |SELECT bloom_filter_agg(cast(null as binary))"""
        .stripMargin)
  }

Different exception:

[DATATYPE_MISMATCH.BLOOM_FILTER_WRONG_TYPE] Cannot resolve "bloom_filter_agg(CAST(NULL AS BINARY), 1000000, 8388608)" due to data type mismatch: Input to function `bloom_filter_agg` should have been "BINARY" followed by value with "BIGINT", but it's ["BINARY", "BIGINT", "BIGINT"].; line 2 pos 7;
'Aggregate [unresolvedalias(bloom_filter_agg(cast(null as binary), 1000000, 8388608, 0, 0), None)]
+- OneRowRelation

This is spark internal function, it is revoked by the planner, for the case table_col is null, it will be
bloom_filter_agg(xxhash64(null)), and xxhash64(null) is 42

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is spark internal function, it is revoked by the planner, for the case table_col is null, it will be
bloom_filter_agg(xxhash64(null)), and xxhash64(null) is 42

Interesting. So the input to bloom_filter_agg is not a value, but a hash of the value. Let's clarify this in the documentation. What's the type if input? Is it VARBINARY or BIGINT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 val rowCount = filterCreationSidePlan.stats.rowCount
    val bloomFilterAgg = new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), rowCount.get.longValue)

It is BIGINT

accumulator->serialize(buffer.data());
serialized = StringView::makeInline(buffer);
} else {
Buffer* buffer = flatResult->getBufferWithSpace(size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more efficient to compute total bytes needed for the whole result and call getBufferWithSpace once.

}

private:
const int64_t DEFAULT_ESPECTED_NUM_ITEMS = 1000000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming: kDefaultExpe...

Use 1'000'000 for readability

private:
const int64_t DEFAULT_ESPECTED_NUM_ITEMS = 1000000;
const int64_t MAX_NUM_ITEMS = 4000000;
// Spark MAX_NUM_BITS is 67108864, but velox has memory limit sizeClassSizes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the documentation match?

@mbasmanova mbasmanova changed the title Add bloom_filter_agg SparkSql function Add bloom_filter_agg Spark aggregate function Apr 18, 2023
@duanmeng
Copy link
Collaborator

@jinchengchenghh

#4633 is fixed. Would you re-enable fuzzer test?

Curious how is this function used in Spark to reduce the amount of shuffle data. Is there something I can read about this?

@mbasmanova Spark Runtime Filters, apache/spark#35789, https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit#heading=h.4v65wq7vzy4q

@jinchengchenghh
Copy link
Contributor Author

@mbasmanova In short, a big table A join with small table B with filter, it will generate the bloomfilter for table B after filter, then broadcast this bloomfilter to A side, and use might_contain to reduce the shuffle data of table A

@jinchengchenghh
Copy link
Contributor Author

Spark aggregate fuzzer tests passed.

@jinchengchenghh
Copy link
Contributor Author

I receive a core dump, but I don't think it is caused by my PR

I20230419 03:23:37.675078 303327 ExpressionVerifier.cpp:88] Executing expression: round(bitwise_and(remainder(unaryminus(bitwise_or(add(multiply("c0",round(14,pmod(520542265,bitwise_and("c1","c2")))),unaryminus(add(subtract(round("c3"),bitwise_or(58,44)),"c4"))),69)),unaryminus(bitwise_and("c5",add(round(round(bitwise_or(26,add("c6","c7")),add("c8","c9")),"c10"),add(bitwise_and(26,"c11"),"c12"))))),bitwise_and("c13",abs(round(add("c14",bitwise_and(43,64)),shiftright(subtract(shiftleft(subtract(1529826847,length("c15")),"c16"),length(upper(sha2(VPl+[xY#8IL#&+*X@/TzpX2i2Vi'\9_`|nird}Lqx-^cFtq.O[!'Rs,"c17")))),851317482))))))
I20230419 03:23:37.675148 303327 ExpressionVerifier.cpp:31] 18 vectors as input:
I20230419 03:23:37.675158 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY TINYINT: 100 elements, 8 nulls], [CONSTANT TINYINT: 100 elements, 48]
I20230419 03:23:37.675171 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY INTEGER: 100 elements, 9 nulls], [FLAT INTEGER: 100 elements, 15 nulls]
I20230419 03:23:37.675184 303327 ExpressionVerifier.cpp:33] 	[FLAT INTEGER: 100 elements, 14 nulls]
I20230419 03:23:37.675194 303327 ExpressionVerifier.cpp:33] 	[FLAT TINYINT: 100 elements, 13 nulls]
I20230419 03:23:37.675204 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY TINYINT: 100 elements, 11 nulls], [FLAT TINYINT: 100 elements, 9 nulls]
I20230419 03:23:37.675215 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY TINYINT: 100 elements, 14 nulls], [FLAT TINYINT: 100 elements, 14 nulls]
I20230419 03:23:37.675226 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY TINYINT: 100 elements, 7 nulls], [DICTIONARY TINYINT: 100 elements, 10 nulls], [FLAT TINYINT: 100 elements, 12 nulls]
I20230419 03:23:37.675240 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY TINYINT: 100 elements, 10 nulls], [DICTIONARY TINYINT: 100 elements, 5 nulls], [DICTIONARY TINYINT: 100 elements, 10 nulls], [CONSTANT TINYINT: 100 elements, 51]
I20230419 03:23:37.675254 303327 ExpressionVerifier.cpp:33] 	[DICTIONARY INTEGER: 100 elements, 12 nulls], [DICTIONARY INTEGER: 100 elements, 7 nulls], [DICTIONARY INTEGER: 100 elements, 12 nulls], [DICTIONARY INTEGER: 100 elements, 11 nulls], [FLAT INTEGER: 100 elements, 9 nulls]
E20230419 03:23:37.675282 303327 Exceptions.h:68] Line: ../.././velox/vector/SimpleVector.h:150, Function:toString, Expression: index < length_ (34 vs. 29) Vector index should be less than length., Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: (34 vs. 29) Vector index should be less than length.
Retriable: False
Expression: index < length_
Function: toString
File: ../.././velox/vector/SimpleVector.h
Line: 150
Stack trace:
# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxException5State4makeIZNS1_C4EPKcmS5_St17basic_string_viewIcSt11char_traitsIcEES9_S9_S9_bNS1_4TypeES9_EUlRT_E_EESt10shared_ptrIKS2_ESA_SB_
# 2  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 3  _ZN8facebook5velox17VeloxRuntimeErrorC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bS7_
# 4  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
# 5  _ZNK8facebook5velox12SimpleVectorIiE8toStringB5cxx11Ei
# 6  _ZNK8facebook5velox14ConstantVectorIiE8toStringB5cxx11Ei
# 7  _ZNK8facebook5velox14ConstantVectorIiE15toSummaryStringB5cxx11Ev
# 8  _ZNK8facebook5velox10BaseVector8toStringB5cxx11Eb
# 9  _ZN8facebook5velox4test12_GLOBAL__N_112logRowVectorERKSt10shared_ptrINS0_9RowVectorEE
# 10 _ZN8facebook5velox4test18ExpressionVerifier6verifyERKSt10shared_ptrIKNS0_4core10ITypedExprEERKS3_INS0_9RowVectorEEOS3_INS0_10BaseVectorEEbSt6vectorIjSaIjEE
# 11 _ZN8facebook5velox4test16ExpressionFuzzer2goEv
# 12 _ZN8facebook5velox4test16expressionFuzzerESt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt6vectorIPKNS0_4exec17FunctionSignatureESaISD_EESt4hashIS8_ESt8equal_toIS8_ESaISt4pairIKS8_SF_EEEm
# 13 _ZN12FuzzerRunner3runERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEmRKSt13unordered_setIS5_St4hashIS5_ESt8equal_toIS5_ESaIS5_EES7_
# 14 main
# 15 __libc_start_main
# 16 _start

*** Aborted at 1681874617 (Unix time, try 'date -d @1681874617') ***
*** Signal 6 (SIGABRT) (0x4a0df) received by PID 303327 (pthread TID 0x7f1d3f1052c0) (linux TID 303327) (maybe from PID 303327, UID 0) (code: -6), stack trace: ***
(error retrieving stack trace)
/bin/bash: line 8: 303327 Aborted                 (core dumped) _build/debug/velox/expression/tests/spark_expression_fuzzer_test --seed ${RANDOM} --duration_sec 60 --logtostderr=1 --minloglevel=0 --repro_persist_path=/tmp/spark_fuzzer_repro

Exited with code exit status 134

@mbasmanova
Copy link
Contributor

I receive a core dump, but I don't think it is caused by my PR

Looks like this is tracked in #4652

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Some follow up comments.

return;
}
rows.applyToSelected([&](vector_size_t row) {
accumulator->init(capacity_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be done once before the loop

}

auto size = accumulator->serializedSize();
StringView serialized;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is not used. Let's remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in following flatResult->setNoCopy(i, serialized);

for (vector_size_t i = 0; i < numGroups; ++i) {
auto group = groups[i];
auto accumulator = value<BloomFilterAccumulator>(group);
if (UNLIKELY(!accumulator->bloomFilter.isSet())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason this cannot be part of accumulator->serializedSize()? It could return zero in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulator->serializedSize() never return 0, it is

  uint32_t serializedSize() const {
    return 1 /* version */
        + 4 /* number of bits */
        + bits_.size() * 8;
  }

capacity_ = numBits_ / 16;
}

int32_t getPreAllocatedBufferSize(char** groups, int32_t numGroups) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps, getPreAllocatedBufferSize -> getTotalSize

for (vector_size_t i = 0; i < numGroups; ++i) {
auto group = groups[i];
auto accumulator = value<BloomFilterAccumulator>(group);
if (UNLIKELY(!accumulator->bloomFilter.isSet())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add a method for this for consistency?

accumulator->initialized()

}

static void
setConstantArgument(const char* name, int64_t& val, int64_t newVal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not abbreviate: currentValue and newValue

}

static void
setConstantArgument(const char* name, int64_t& val, int64_t newVal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is small and used only only. Consider folding this logic into the caller for readability.

// Reusable instance of DecodedVector for decoding input vectors.
DecodedVector decodedRaw_;
DecodedVector decodedIntermediate_;
int64_t originalEstimatedNumItems_ = kMissingArgument;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need both originalEstimatedNumItems_ and estimatedNumItems_ member variables? Looks like just one would be sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because constant originalEstimatedNumItems_ is the value in Vector, and it will compare with max value to get estimatedNumItems_ which is used in function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that estimatedNumItems_ can be lower than originalEstimatedNumItems_ if input value is too large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

auto vectors = {makeRowVector({makeAllNullFlatVector<int64_t>(2)})};
auto expectedFake = {makeRowVector(
{makeNullableFlatVector<StringView>({std::nullopt}, VARBINARY())})};
EXPECT_THROW(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use VELOX_ASSERT_THROW

@@ -11,6 +11,22 @@ General Aggregate Functions

Returns the bitwise XOR of all non-null input values, or null if none.

.. spark:function:: bloom_filter_agg(x, estimatedNumItems, numBits) -> varbinary

Insert ``x`` into BloomFilter, and returns the serialized BloomFilter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps,

Creates bloom filter from values of 'x' and returns it serialized into VARBINARY.

``estimatedNumItems`` provides an estimate of the number of unique values of ``x``. Value is capped at 716,800.

``numBits`` specifies max capacity of the bloom filter, which allows to trade accuracy for memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the number of unique values to the number of values, this is spark intend meaning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh I'm not sure I understand why would we want to specify the estimate of the total number of input values. Would you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
  /**
   * Computes the optimal k (number of hashes per item inserted in Bloom filter), given the
   * expected insertions and total number of bits in the Bloom filter.
   *
   * See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula.
   *
   * @param n expected insertions (must be positive)
   * @param m total number of bits in Bloom filter (must be positive)
   */
  private static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
  }

The spark BloomFilter implementation use this value to compute number of hash functions, it has a optimal value according to the theory. Velox implementation does not use variable number of hash function, it uses constant 4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Are you saying Velox's implementation doesn't use estimatedNumItems arguments? If so, should we remove it? Otherwise, should we document that this argument is not used and remove logic for capping its value and initializing estimatedNumItems_ member variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the final solution, I may implement native Spark BloomFilter in Velox, it will have better performance, we can switch to it then.

And if we not specify numBits, we will use estimatedNumItems to estimate numBits, BloomFilter implemenation doesn't use estimatedNumItems argument, but used in bloom_filter_agg

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's clarify all this in the documentation. It is not obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to also document the difference between spark, or just clarify the usage in Velox?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document the difference with Spark as well. By default, the assumption is that Velox functions match semantics of the original engine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, updated

@@ -11,6 +11,36 @@ General Aggregate Functions

Returns the bitwise XOR of all non-null input values, or null if none.

.. spark:function:: bloom_filter_agg(x, estimatedNumItems, numBits) -> varbinary

Creates bloom filter from values of hashed value 'x' and returns it serialized into VARBINARY.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from values of hashed value 'x'

It is a bit cryptic. Perhaps, rename x to hash and say something like

.. spark:function:: bloom_filter_agg(hash, estimatedNumItems, numBits) -> varbinary

    Creates bloom filter from input hashes and returns it serialized into VARBINARY. The caller is expected to apply xxhash64 function to input data before calling bloom_filter_agg.

For example, 

     bloom_filter_agg(xxhash64(x), 1000000, 1024)

.. spark:function:: bloom_filter_agg(x, estimatedNumItems, numBits) -> varbinary

Creates bloom filter from values of hashed value 'x' and returns it serialized into VARBINARY.
``estimatedNumItems`` and ``numBits`` decides the number of hash functions and bloom filter capacity in Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typos:

In Spark implementation, ``estimatedNumItems`` and ``numBits`` are used to decide the number of hash functions and bloom filter capacity.

In Velox implementation, ``estimatedNumItems`` is not used.

Current bloom filter implementation is different with Spark, if specified ``numBits``, ``estimatedNumItems``
will not be used.

``x`` should be xxhash64(``y``).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed.

will not be used.

``x`` should be xxhash64(``y``).
``estimatedNumItems`` provides an estimate of the number of values of ``y``, which takes no effect here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove since it is not used.

``x`` should be xxhash64(``y``).
``estimatedNumItems`` provides an estimate of the number of values of ``y``, which takes no effect here.
``numBits`` specifies max capacity of the bloom filter, which allows to trade accuracy for memory.
Value of numBits in Spark is capped at 67,108,864, actually is capped at 716,800 in case of class memory limit .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typos:

In Spark,  the value of``numBits`` is automatically capped at 67,108,864.

In Velxo, the value of``numBits`` is automatically capped at 716,800.

@@ -11,6 +11,36 @@ General Aggregate Functions

Returns the bitwise XOR of all non-null input values, or null if none.

.. spark:function:: bloom_filter_agg(x, estimatedNumItems, numBits) -> varbinary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also mention that x / hash cannot be null.

bool /*mayPushdown*/) override {
decodeArguments(rows, args);
VELOX_USER_CHECK(
!decodedRaw_.mayHaveNulls(), "First argument value should not be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First argument value

Users may not understand what this refers to. How about,

First argument of bloom_filter_agg cannot be null

However, !decodedRaw_.mayHaveNulls() is a very strong check. It may return false even if there are no nulls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about It may return false even if there are no nulls.
In my mind, I understand it is supposed to be return false if there are no nulls, return false or true when there are nulls or not, so it is determined there is no nulls.

Other codes obey this rule.
https://github.com/facebookincubator/velox/blob/main/velox/connectors/hive/HiveConnector.cpp#L633
Scalar function document:
https://facebookincubator.github.io/velox/develop/scalar-functions.html

bool mayHaveNulls() : constant time check on the underlying vector nullity. When it returns false, there are definitely no nulls, a true does not guarantee null existence.

auto size = accumulator->serializedSize();
StringView serialized;
if (StringView::isInline(size)) {
std::string buffer(size, '\0');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check this comment?

} else {
char* ptr = buffer->asMutable<char>() + buffer->size();
accumulator->serialize(ptr);
buffer->setSize(buffer->size() + size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

private:
const int64_t kDefaultExpectedNumItems = 1'000'000;
const int64_t kMaxNumItems = 4'000'000;
// Spark kMaxNumBits is 67108864, but velox has memory limit sizeClassSizes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Thank you for iterating on this PR. There are a lot of tricky details to get right. Some follow-up comments.

Creates bloom filter from input hashes and returns it serialized into VARBINARY.
The caller is expected to apply xxhash64 function to input data before calling bloom_filter_agg.
For example,
bloom_filter_agg(xxhash64(x), 100, 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you generate the docs locally and verify they get formatted nicely. It seems to me that we need some new lines or something around the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the document, I don't know how to confirm the document before.
Now I know how to convert it to html and use Vscode to preview, I will check the html format. Thanks for your kindly review.

``hash`` cannot be null.
``numBits`` specifies max capacity of the bloom filter, which allows to trade accuracy for memory.
In Spark, the value of``numBits`` is automatically capped at 67,108,864.
In Velxo, the value of``numBits`` is automatically capped at 716,800.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Velxo -> Velox

is automatically capped at 716,800.

Let's update PR description to explain where this limitation comes from. CC: @xiaoxmeng

In Spark, the value of``numBits`` is automatically capped at 67,108,864.
In Velxo, the value of``numBits`` is automatically capped at 716,800.

``x``, ``estimatedNumItems`` and ``numBits`` must be ``BIGINT``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x -> hash


.. spark:function:: bloom_filter_agg(hash, estimatedNumItems) -> varbinary

As ``bloom_filter_agg``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description needs to be revised.

A version of ``bloom_filter_agg`` that uses ``numBits`` computed as ``estimatedNumItems`` * 8.

``estimatedNumItems`` provides an estimate of the number of values of <TBD: fill in; y seems wrong>.

Value of ``estimatedNumItems`` is capped at 4,000,000.

Does 4M cap come from Spark? If so, let's clarify

Value of ``estimatedNumItems`` is capped at 4,000,000 like to match Spark's implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like defaults are configurable in Spark. Should these be configurable in Velox as well?

  val RUNTIME_BLOOM_FILTER_EXPECTED_NUM_ITEMS =
    buildConf("spark.sql.optimizer.runtime.bloomFilter.expectedNumItems")
      .doc("The default number of expected items for the runtime bloomfilter")
      .version("3.3.0")
      .longConf
      .createWithDefault(1000000L)

  val RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS =
    buildConf("spark.sql.optimizer.runtime.bloomFilter.maxNumItems")
      .doc("The max allowed number of expected items for the runtime bloom filter")
      .version("3.3.0")
      .longConf
      .createWithDefault(4000000L)


  val RUNTIME_BLOOM_FILTER_NUM_BITS =
    buildConf("spark.sql.optimizer.runtime.bloomFilter.numBits")
      .doc("The default number of bits to use for the runtime bloom filter")
      .version("3.3.0")
      .longConf
      .createWithDefault(8388608L)

  val RUNTIME_BLOOM_FILTER_MAX_NUM_BITS =
    buildConf("spark.sql.optimizer.runtime.bloomFilter.maxNumBits")
      .doc("The max number of bits to use for the runtime bloom filter")
      .version("3.3.0")
      .longConf
      .createWithDefault(67108864L)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we cannot get configuration when we implement aggregate functions, because we cannot get queryContext config here.
If we need this feature, we should reserve config in GroupingSet when we initialize it in HashAggregation.cpp.
Now just Spiller::Config exists in GroupingSet.
And we need to add a new argument config or context to functions such as addRawInput, it will change all the aggregate function input arguments.
Or we can reserve the config in Aggregate, but I don't suggest this way, it change the code less but Aggregate is created from FunctionRegistry factory, we cannot use constructor to create it with config.

If you think it is needed, I can help to implement it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out. Let's create a GitHub issue to explain this use case and discuss how best to implement it. For this PR, let's just mention in the documentation that Spark allows for changing the defaults, but Velox does not.


.. spark:function:: bloom_filter_agg(hash) -> varbinary

As ``bloom_filter_agg``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

A version of ``bloom_filter_agg`` that uses 8,000,000 as ``numBits``.

Would you confirm that this matches Spark's implementation?

flatResult->resize(numGroups);

int32_t totalSize = getTotalSize(groups, numGroups);
Buffer* buffer = flatResult->getBufferWithSpace(totalSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but is quite a bit of code and easy to get wrong. For example, by forgetting to call buffer->setSize or forgetting to add buffer_.size() when initializing bufferPtr.

Consider introducing new method:

char* rawBuffer = flatResult->getRawStringBufferWithSpace(totalSize);

This method would return the pointer to the first "writable" byte and update the size of the 'buffer' to include totalSize.

if (args.size() > 1) {
DecodedVector decodedEstimatedNumItems(*args[1], rows);
setConstantArgument(
"originalEstimatedNumItems",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check this comment.

VELOX_CHECK_EQ(args.size(), 3);
DecodedVector decodedNumBits(*args[2], rows);
setConstantArgument(
"originalNumBits", originalNumBits_, decodedNumBits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

for (vector_size_t i = 0; i < numGroups; ++i) {
auto group = groups[i];
auto accumulator = value<BloomFilterAccumulator>(group);
if (UNLIKELY(!accumulator->initialized())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this happen if we run masked aggregation and all rows for a given groups are masked out? Would you add a test case to verify this code path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more about BloomFilterAggAggregateTest.emptyInput.
Current test can cover this path, And gluten unit test can run into this path too.

[==========] Running 4 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 4 tests from BloomFilterAggAggregateTest
[ RUN      ] BloomFilterAggAggregateTest.basic
not init
[       OK ] BloomFilterAggAggregateTest.basic (40 ms)
[ RUN      ] BloomFilterAggAggregateTest.bloomFilterAggArgument
not init
not init
[       OK ] BloomFilterAggAggregateTest.bloomFilterAggArgument (160 ms)
[ RUN      ] BloomFilterAggAggregateTest.emptyInput
not init
not init
not init
not init
not init
not init
not init
not init
not init
not init
[       OK ] BloomFilterAggAggregateTest.emptyInput (30 ms)

static void setConstantArgument(
const char* name,
int64_t& currentValue,
const DecodedVector& vec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not abbreviate: vec -> vector or decoded

@jinchengchenghh
Copy link
Contributor Author

This failure happens again, can you help check it? @mbasmanova

I20230508 01:38:13.277568 301220 AggregationFuzzer.cpp:452] Persisted input: /tmp/spark_aggregate_fuzzer_repro/velox_vector_MHUn0c and plan: /tmp/spark_aggregate_fuzzer_repro/velox_plan_tWDyIv
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: estimatedNumItems argument must be constant for all input rows
Retriable: False
Expression: vector.isConstantMapping()
Function: setConstantArgument
File: ../../velox/functions/sparksql/aggregates/BloomFilterAggAggregate.cpp
Line: 251

@mbasmanova
Copy link
Contributor

@jinchengchenghh
Copy link
Contributor Author

Do you have further comments? @mbasmanova

@mbasmanova
Copy link
Contributor

@jinchengchenghh The CI is red. Would you rebase the PR and make sure CI is green?

@jinchengchenghh jinchengchenghh force-pushed the bloomagg branch 2 times, most recently from 38e947d to 2e642ce Compare June 1, 2023 01:01
@jinchengchenghh
Copy link
Contributor Author

The CI passed @mbasmanova

@facebook-github-bot
Copy link
Contributor

@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@@ -403,6 +403,10 @@ class FlatVector final : public SimpleVector<T> {
return nullptr;
}

char* getRawStringBufferWithSpace(vector_size_t /* unused */) {
return nullptr;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linter pointed out that this semi-colon is not needed; let's remove

I'm also seeing that this method is lacking documentation and tests. Would you submit a separate PR to introduce this method, document it clearly and add a test?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few linter warnings:

Screenshot 2023-06-01 at 10 36 36 AM Screenshot 2023-06-01 at 10 36 42 AM Screenshot 2023-06-01 at 10 36 47 AM

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few linter warnings:

Screenshot 2023-06-01 at 10 36 36 AM Screenshot 2023-06-01 at 10 36 42 AM Screenshot 2023-06-01 at 10 36 47 AM

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few linter warnings:

Screenshot 2023-06-01 at 10 36 36 AM Screenshot 2023-06-01 at 10 36 42 AM Screenshot 2023-06-01 at 10 36 47 AM

@jinchengchenghh jinchengchenghh force-pushed the bloomagg branch 2 times, most recently from 11cfb31 to 36b31e5 Compare June 6, 2023 06:30
@jinchengchenghh
Copy link
Contributor Author

Fixed the linter warning, can it be imported? @mbasmanova

@facebook-github-bot
Copy link
Contributor

@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@mbasmanova merged this pull request in 86137eb.

@conbench-facebook
Copy link

Conbench analyzed the 1 benchmark run on commit 86137eb2.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add bloom_filter_agg and might_contain SparkSql function
6 participants