Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add might_contain SparkSql function #4029

Closed

Conversation

jinchengchenghh
Copy link
Contributor

@jinchengchenghh jinchengchenghh commented Feb 13, 2023

This function aims to return whether serialized bloom filter might contain value
The serialized bloom filter will be returned by SparkSql function bloom_filter_agg

Resolved: #3694

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Feb 13, 2023
@netlify
Copy link

netlify bot commented Feb 13, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 9076cc4
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/6437ab88bc985a00085fffbb

@jinchengchenghh
Copy link
Contributor Author

@mbasmanova Can you help review this one?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Some initial comments. Please, update PR description to provide some context about this function. Please, run Fuzzer and report the results.

output.merge(serialized->valueAt<StringView>(0).data());
rows.applyToSelected([&](int row) {
auto contain = output.mayContain(
folly::hasher<int64_t>()(value->valueAt<int64_t>(row)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if value->isNullAt(row)?

velox/functions/sparksql/MightContain.cpp Show resolved Hide resolved
@@ -147,6 +148,9 @@ void registerFunctions(const std::string& prefix) {

registerFunction<YearFunction, int32_t, Timestamp>({"year"});
registerFunction<YearFunction, int32_t, Date>({"year"});
// Register bloom filter function
exec::registerStatefulVectorFunction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function appears stateless. Why are we registering it as a stateful function? Should we make it actually stateful for the case when bloom filter is constant? In that case we can initialize BloomFilter just once per "query thread".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in spark use case, bloom filter is always constant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, then we should check that bloom filter is constant and raise an exception if it is not. We should also make sure to "parse" the bloom filter binary just once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing that this function is stateless. Hence, let's use registerVectorFunction API to register it.

return evaluateOnce<bool>(
fmt::format("might_contain(cast(c0 as varbinary), {})", value), bloom);
}
std::shared_ptr<memory::MemoryPool> pool_{memory::getDefaultMemoryPool()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use pool_ from the base class.

fmt::format("might_contain(cast(c0 as varbinary), {})", value), bloom);
}
std::shared_ptr<memory::MemoryPool> pool_{memory::getDefaultMemoryPool()};
HashStringAllocator allocator_{pool_.get()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? I think we can just use default allocator, no?

HashStringAllocator allocator_{pool_.get()};
};

TEST_F(MightContainTest, common) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common -> basic


StringView serialized(data.data(), data.size());
for (auto i = 0; i < kSize; ++i) {
EXPECT_TRUE(mightContain(serialized, i).value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a vector function, let's make sure we test it on vectors of size > 1 with and without nulls.


rows.applyToSelected([&](int row) {
BloomFilter output{StlAllocator<uint64_t>(&allocator)};
output.merge(serialized->valueAt<StringView>(0).data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 -> row ?

Should we check serialized->isNullAt(row)?

@@ -27,3 +27,8 @@ Binary Functions
.. spark:function:: xxhash64(x) -> integer

Computes the xxhash64 of x.

.. spark:function:: might_contain(x, v) -> boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the function to preserve alphabetical order.


.. spark:function:: might_contain(x, v) -> boolean

Returns serialized BloomFilter ``x`` might contain value ``v``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typos:

.. spark:function:: might_contain(bloomFilter, value) -> boolean

   Returns TRUE if ``bloomFilter`` might contain ``value``. 

   ``bloomFilter`` is a VARBINARY computed using ::spark::function::`bloom_filter_agg` aggregate function. 
   ``value`` is a BIGINT.

@jinchengchenghh
Copy link
Contributor Author

Because the first input argument should be serialized bloom filter, so the random data will failed by BloomFilter version check.
We cannot check this function by fuzzer test.

root@sr249:/mnt/DP_disk1/code/velox/_build/release/velox/expression/tests# ./spark_expression_fuzzer_test --logtostderr --duration_sec 600 --only might_contain
I0406 16:23:26.612116 3963981 FuzzerRunner.h:120] Skipping special form: if
I0406 16:23:26.612203 3963981 FuzzerRunner.h:120] Skipping special form: coalesce
I0406 16:23:26.612210 3963981 FuzzerRunner.h:120] Skipping special form: cast
I0406 16:23:26.612215 3963981 FuzzerRunner.h:120] Skipping special form: switch
I0406 16:23:26.612304 3963981 ExpressionFuzzer.cpp:484] Total candidate functions: 1 (1 signatures)
I0406 16:23:26.612314 3963981 ExpressionFuzzer.cpp:488] Functions with at least one supported signature: 1 (100.00%)
I0406 16:23:26.612326 3963981 ExpressionFuzzer.cpp:492] Functions with no supported signature: 0 (0.00%)
I0406 16:23:26.612334 3963981 ExpressionFuzzer.cpp:496] Supported function signatures: 1 (100.00%)
I0406 16:23:26.612342 3963981 ExpressionFuzzer.cpp:500] Unsupported function signatures: 0 (0.00%)
I0406 16:23:26.612370 3963981 ExpressionFuzzer.cpp:1137] ==============================> Started iteration 0 (seed: 123456)
I0406 16:23:26.612385 3963981 ExpressionFuzzer.cpp:782] Couldn't find any function to return 'VARBINARY'. Returning a constant instead.
I0406 16:23:26.612604 3963981 ExpressionFuzzer.cpp:782] Couldn't find any function to return 'BIGINT'. Returning a constant instead.
I0406 16:23:26.612634 3963981 ExpressionVerifier.cpp:88] Executing expression: might_contain(KL,%*qJT`<WnL{bJAZ]s5cl_yP"T<aywh}EL~g#Z!@.#cP,2900225831860459783)
I0406 16:23:26.612650 3963981 ExpressionVerifier.cpp:31] 0 vectors as input:
E0406 16:23:26.612735 3963981 Exceptions.h:68] Line: /mnt/DP_disk1/code/velox/./velox/common/base/BloomFilter.h:67, Function:merge, Expression: kBloomFilterV1 == version (1 vs. 75), Source: RUNTIME, ErrorCode: INVALID_STATE
E0406 16:23:26.613003 3963981 ExpressionVerifier.cpp:157] Common eval: Exceptions other than VeloxUserError are not allowed.
I0406 16:23:26.613013 3963981 ExpressionVerifier.cpp:221] Skipping persistence because repro path is empty.
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: (1 vs. 75)
Retriable: False
Expression: kBloomFilterV1 == version
Context: might_contain(KL,%*qJT`<WnL{bJAZ]s5cl_yP"T<aywh}EL~g#Z!@.#cP:VARBINARY, 2900225831860459783:BIGINT)
Top-Level Context: Same as context.
Function: merge
File: /mnt/DP_disk1/code/velox/./velox/common/base/BloomFilter.h
Line: 67
Stack trace:
# 0
# 1
# 2
# 3
# 4
# 5
# 6
# 7
# 8
# 9
# 10
# 11
# 12
# 13
# 14
# 15
# 16
# 17
# 18
# 19

*** Aborted at 1680798206 (Unix time, try 'date -d @1680798206') ***
*** Signal 6 (SIGABRT) (0x3c7c4d) received by PID 3963981 (pthread TID 0x7f977f1f7280) (linux TID 3963981) (maybe from PID 3963981, UID 0) (code: -6), stack trace: ***
(error retrieving stack trace)
Aborted (core dumped)

@mbasmanova
Copy link
Contributor

@jinchengchenghh

Because the first input argument should be serialized bloom filter, so the random data will failed by BloomFilter version check.

When function inputs are not valid, the function should throw a user error, not system error. Looks like you need to change VELOX_CHECK_xxx to VELOX_USER_CHECK_xxx somewhere.

std::unordered_set<std::string> skipFunctions = {
"regexp_extract", "rlike", "chr", "replace"};
"regexp_extract", "rlike", "chr", "replace", "might_contain"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's figure out how to avoid adding this new function to the skip list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider VELOX_USER_CHECK exception as normal case which can pass fuzzer test? This may be the illegal user input

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already the case. Are you still seeing fuzzer failures? Can you share the output?

};
// This function argument is a serialized bloom filter, it cannot accept
// random data.
"might_contain"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might_contain is a Spark function, but this test only uses Presto functions. Please, remove.

std::vector<std::shared_ptr<exec::FunctionSignature>> mightContainSignatures() {
return {exec::FunctionSignatureBuilder()
.returnType("boolean")
.argumentType("varbinary")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this argument as constant?

See https://github.com/facebookincubator/velox/pull/4204/files

@@ -60,8 +60,7 @@ int main(int argc, char** argv) {
"cardinality",
"in",
"element_at",
"width_bucket",
};
"width_bucket"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, keep the original formatting with an extra comma after the last entry.

@@ -55,7 +55,8 @@ int main(int argc, char** argv) {

// The following list are the Spark UDFs that hit issues
// For rlike you need the following combo in the only list:
// rlike, md5 and upper
// rlike, md5 and upper, might_contain argument is a serialized bloom filter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not needed. Let's rever.

HashStringAllocator allocator{context.pool()};
VELOX_USER_CHECK(serialized->isConstantMapping())
BloomFilter output{StlAllocator<uint64_t>(&allocator)};
output.merge(serialized->valueAt<StringView>(0).str().c_str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use serialized->valueAt<StringView>(0).data().

Can output.merge throw? If that's the case, you need to catch the exception and report it via context.setErrors. Otherwise, this function won't work properly under TRY. See https://facebookincubator.github.io/velox/develop/scalar-functions.html#try-expression-support

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline const char *facebook::velox::StringView::data() &&
+1 overload

function "facebook::velox::StringView::data() &&" (declared at line 114 of "/mnt/DP_disk1/code/velox/velox/type/StringView.h") cannot be referenced -- it is a deleted function

So I cannot use serialized->valueAt<StringView>(0).data()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very strange. We need to fix this. CC: @pedroerp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not safe, so the compiler error is legit - and by design.

serialized->valueAt<StringView>(0) will generate a temporary StringView, so your .data() would return a pointer into the temporary StringView, which gets destructed right away. If you want to do something like this, you need to explicitly save it into the stack to ensure its lifetime:

auto sv = serialized->valueAt<StringView>(0);
auto ptr = sv.data();  // all good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pedroerp Pedro, thank you for clarifying. Let me send a PR to do that. This is quite inconvenient though and makes developers go with a solution that copies the string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a PR: #4650

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite inconvenient though and makes developers go with a solution that copies the string.

Agreed it's inconvenient, but the alternative is removing the compiler check at the cost of correctness. Also, the example I send above doesn't copy the string. The compiler should translate into the exact same code, except safer.

@@ -147,6 +148,9 @@ void registerFunctions(const std::string& prefix) {

registerFunction<YearFunction, int32_t, Timestamp>({"year"});
registerFunction<YearFunction, int32_t, Date>({"year"});
// Register bloom filter function
exec::registerStatefulVectorFunction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing that this function is stateless. Hence, let's use registerVectorFunction API to register it.

assertEqualVectors(expected, result);
}

std::string getSerializedBloom() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not abbreviate: getSerializedBloomFilter

auto bloom = makeConstant<StringView>(StringView(serialized), 10);
auto value =
makeFlatVector<int64_t>(10, [](vector_size_t row) { return row; });
auto expected =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use makeConstant(true, 10) for brevity.


auto valueNotContain = makeFlatVector<int64_t>(
10, [](vector_size_t row) { return row + 123451; });
auto expectedNotContain =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use makeConstant(false, 10) for brevity.

makeFlatVector<bool>(10, [](vector_size_t row) { return true; });
testMightContain(bloom, value, expected);

auto valueNotContain = makeFlatVector<int64_t>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add a test case where some rows return true and others false?

auto serializedBloom = getSerializedBloom();
auto bloom = makeConstant<StringView>(StringView(serializedBloom), 2);
auto value = makeNullableFlatVector<int64_t>({std::nullopt, 2});
auto expected = makeNullableFlatVector<bool>({std::nullopt, true});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makeNullConstant(TypeKind::BOOLEAN, 2)

TEST_F(MightContainTest, nullBloom) {
auto bloom = makeConstant<StringView>(std::nullopt, 2);
auto value = makeFlatVector<int64_t>({2, 4});
auto expected = makeNullableFlatVector<bool>({std::nullopt, std::nullopt});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makeNullConstant(TypeKind::BOOLEAN, 2)

try {
output.merge(serialized->valueAt<StringView>(0).str().c_str());
} catch (const std::exception& e) {
rows.applyToSelected(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use context.setErrors(rows, std::current_exception())

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Looks good % a few minor comments.

namespace facebook::velox::functions::sparksql::test {
namespace {

using namespace facebook::velox::test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this out of the namespace, i.e. right after #includes.


std::string getSerializedBloomFilter() {
constexpr int64_t kSize = 10;
BloomFilter bloom;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bloom -> bloomFilter or filter


TEST_F(MightContainTest, basic) {
auto serialized = getSerializedBloomFilter();
auto bloom = makeConstant<StringView>(StringView(serialized), 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bloom -> bloomFilter or filter


auto values = makeNullableFlatVector<int64_t>(
{1, 2, 3, 4, 5, std::nullopt, 123451, 23456, 4, 5});
auto expects = makeNullableFlatVector<bool>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expects -> expected

testMightContain(bloom, values, expects);
}

TEST_F(MightContainTest, nullBloom) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullBloom -> nullBloomFilter or nullFilter

testMightContain(bloom, value, expected);
}

TEST_F(MightContainTest, nullValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already tested in 'basic'. Let's remove.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@facebook-github-bot
Copy link
Contributor

@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@mbasmanova merged this pull request in 12d5c87.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add bloom_filter_agg and might_contain SparkSql function
4 participants