Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start compaction operation in CompactionFilter::Filter #780

Merged
merged 9 commits into from
Jul 20, 2021
2 changes: 1 addition & 1 deletion src/server/compaction_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ compaction_operations create_compaction_operations(const std::string &json, uint
enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
if (operation != nullptr) {
operation->set_rules(std::move(rules));
res.emplace_back(std::unique_ptr<compaction_operation>(operation));
res.emplace_back(std::shared_ptr<compaction_operation>(operation));
}
}
return res;
Expand Down
2 changes: 1 addition & 1 deletion src/server/compaction_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class update_ttl : public compaction_operation
FRIEND_TEST(compaction_filter_operation_test, create_operations);
};

typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
typedef std::vector<std::shared_ptr<compaction_operation>> compaction_operations;
compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
void register_compaction_operations();
} // namespace server
Expand Down
33 changes: 30 additions & 3 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
bool enabled,
int32_t pidx,
int32_t partition_version,
bool validate_hash)
bool validate_hash,
compaction_operations &&compaction_ops)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
_validate_partition_hash(validate_hash)
_validate_partition_hash(validate_hash),
_user_specified_operations(std::move(compaction_ops))
{
}

Expand All @@ -60,6 +62,23 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return false;
}

// ignore empty write
if (key.size() < 2) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

// user specified compaction operations
if (!_user_specified_operations.empty()) {
std::string hash_key, sort_key;
pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key);
for (const auto &op : _user_specified_operations) {
if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) {
// return true if this data need to be deleted
return true;
}
}
}

uint32_t expire_ts =
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
Expand Down Expand Up @@ -94,6 +113,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
compaction_operations _user_specified_operations;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
Expand All @@ -105,13 +125,20 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
compaction_operations tmp_filter_operations;
{
dsn::utils::auto_read_lock l(_lock);
tmp_filter_operations = _user_specified_operations;
}

return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
_validate_partition_hash.load()));
_validate_partition_hash.load(),
std::move(tmp_filter_operations)));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

Expand Down