Skip to content

Commit

Permalink
Merge branch 'master' of github.com:milvus-io/milvus into cp-yah-opt-…
Browse files Browse the repository at this point in the history
…var-column
  • Loading branch information
longjiquan committed Feb 28, 2024
2 parents ce455e7 + af31553 commit 81f3550
Show file tree
Hide file tree
Showing 42 changed files with 1,745 additions and 2,813 deletions.
6 changes: 6 additions & 0 deletions internal/core/src/segcore/AckResponder.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class AckResponder {
return minimum_;
}

void
clear() {
acks_.clear();
minimum_ = 0;
}

private:
bool
fetch_and_flip(int64_t endpoint) {
Expand Down
52 changes: 39 additions & 13 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class OffsetMap {
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const = 0;

virtual void
clear() = 0;
};

template <typename T>
Expand Down Expand Up @@ -121,6 +124,12 @@ class OffsetOrderedMap : public OffsetMap {
return find_first_by_index(limit, bitset, false_filtered_out);
}

void
clear() override {
std::unique_lock<std::shared_mutex> lck(mtx_);
map_.clear();
}

private:
std::vector<OffsetType>
find_first_by_index(int64_t limit,
Expand Down Expand Up @@ -232,6 +241,11 @@ class OffsetOrderedArray : public OffsetMap {
return find_first_by_index(limit, bitset, false_filtered_out);
}

void
clear() override {
array_.clear();
}

private:
std::vector<OffsetType>
find_first_by_index(int64_t limit,
Expand Down Expand Up @@ -275,19 +289,6 @@ class OffsetOrderedArray : public OffsetMap {

template <bool is_sealed = false>
struct InsertRecord {
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<idx_t> row_ids_;

// used for preInsert of growing segment
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;

// used for timestamps index of sealed segment
TimestampIndex timestamp_index_;

// pks to row offset
std::unique_ptr<OffsetMap> pk2offset_;

InsertRecord(const Schema& schema, int64_t size_per_chunk)
: row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
Expand Down Expand Up @@ -581,6 +582,31 @@ struct InsertRecord {
return ack_responder_.GetAck();
}

void
clear() {
timestamps_.clear();
row_ids_.clear();
reserved = 0;
ack_responder_.clear();
timestamp_index_ = TimestampIndex();
pk2offset_->clear();
fields_data_.clear();
}

public:
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<idx_t> row_ids_;

// used for preInsert of growing segment
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;

// used for timestamps index of sealed segment
TimestampIndex timestamp_index_;

// pks to row offset
std::unique_ptr<OffsetMap> pk2offset_;

private:
// std::vector<std::unique_ptr<VectorBase>> fields_data_;
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> fields_data_{};
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/segcore/SealedIndexingRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ struct SealedIndexingRecord {
return field_indexings_.count(field_id);
}

void
clear() {
std::unique_lock lck(mutex_);
field_indexings_.clear();
}

private:
// field_offset -> SealedIndexingEntry
std::unordered_map<FieldId, SealedIndexingEntryPtr> field_indexings_;
Expand Down
13 changes: 13 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,19 @@ SegmentSealedImpl::bulk_subscript_impl(int64_t element_sizeof,
}
}

void
SegmentSealedImpl::ClearData() {
field_data_ready_bitset_.clear();
index_ready_bitset_.clear();
binlog_index_bitset_.clear();
system_ready_count_ = 0;
num_rows_ = 0;
scalar_indexings_.clear();
vector_indexings_.clear();
insert_record_.clear();
fields_.clear();
}

std::unique_ptr<DataArray>
SegmentSealedImpl::fill_with_empty(FieldId field_id, int64_t count) const {
auto& field_meta = schema_->operator[](field_id);
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ class SegmentSealedImpl : public SegmentSealed {
const int64_t* seg_offsets,
int64_t count) const override;

void
ClearData();

protected:
// blob and row_count
SpanBase
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/segcore/segment_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ DeleteSegment(CSegmentInterface c_segment) {
delete s;
}

void
ClearSegmentData(CSegmentInterface c_segment) {
auto s = static_cast<milvus::segcore::SegmentSealedImpl*>(c_segment);
s->ClearData();
}

void
DeleteSearchResult(CSearchResult search_result) {
auto res = static_cast<milvus::SearchResult*>(search_result);
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/segment_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ NewSegment(CCollection collection,
void
DeleteSegment(CSegmentInterface c_segment);

void
ClearSegmentData(CSegmentInterface c_segment);

void
DeleteSearchResult(CSearchResult search_result);

Expand Down
129 changes: 129 additions & 0 deletions internal/core/unittest/test_sealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,135 @@ TEST(Sealed, LoadFieldData) {
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), timestamp));
}

TEST(Sealed, ClearData) {
auto dim = 16;
auto topK = 5;
auto N = ROW_COUNT;
auto metric_type = knowhere::metric::L2;
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
auto counter_id = schema->AddDebugField("counter", DataType::INT64);
auto double_id = schema->AddDebugField("double", DataType::DOUBLE);
auto nothing_id = schema->AddDebugField("nothing", DataType::INT32);
auto str_id = schema->AddDebugField("str", DataType::VARCHAR);
schema->AddDebugField("int8", DataType::INT8);
schema->AddDebugField("int16", DataType::INT16);
schema->AddDebugField("float", DataType::FLOAT);
schema->AddDebugField("json", DataType::JSON);
schema->AddDebugField("array", DataType::ARRAY, DataType::INT64);
schema->set_primary_field_id(counter_id);

auto dataset = DataGen(schema, N);

auto fakevec = dataset.get_col<float>(fakevec_id);

auto indexing = GenVecIndexing(
N, dim, fakevec.data(), knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);

auto segment = CreateSealedSegment(schema);
// std::string dsl = R"({
// "bool": {
// "must": [
// {
// "range": {
// "double": {
// "GE": -1,
// "LT": 1
// }
// }
// },
// {
// "vector": {
// "fakevec": {
// "metric_type": "L2",
// "params": {
// "nprobe": 10
// },
// "query": "$0",
// "topk": 5,
// "round_decimal": 3
// }
// }
// }
// ]
// }
// })";
const char* raw_plan = R"(vector_anns: <
field_id: 100
predicates: <
binary_range_expr: <
column_info: <
field_id: 102
data_type: Double
>
lower_inclusive: true,
upper_inclusive: false,
lower_value: <
float_val: -1
>
upper_value: <
float_val: 1
>
>
>
query_info: <
topk: 5
round_decimal: 3
metric_type: "L2"
search_params: "{\"nprobe\": 10}"
>
placeholder_tag: "$0"
>)";
Timestamp timestamp = 1000000;
auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
auto plan =
CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto num_queries = 5;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024);
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());

ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), timestamp));

SealedLoadFieldData(dataset, *segment);
segment->Search(plan.get(), ph_group.get(), timestamp);

segment->DropFieldData(fakevec_id);
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), timestamp));

LoadIndexInfo vec_info;
vec_info.field_id = fakevec_id.get();
vec_info.index = std::move(indexing);
vec_info.index_params["metric_type"] = knowhere::metric::L2;
segment->LoadIndex(vec_info);

ASSERT_EQ(segment->num_chunk(), 1);
ASSERT_EQ(segment->num_chunk_index(double_id), 0);
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
auto chunk_span3 = segment->chunk_data<std::string_view>(str_id, 0);
auto ref1 = dataset.get_col<int64_t>(counter_id);
auto ref2 = dataset.get_col<double>(double_id);
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
for (int i = 0; i < N; ++i) {
ASSERT_EQ(chunk_span1[i], ref1[i]);
ASSERT_EQ(chunk_span2[i], ref2[i]);
ASSERT_EQ(chunk_span3[i], ref3[i]);
}

auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
auto json = SearchResultToJson(*sr);
std::cout << json.dump(1);

auto sealed_segment = (SegmentSealedImpl*)segment.get();
sealed_segment->ClearData();
ASSERT_EQ(sealed_segment->get_row_count(), 0);
ASSERT_EQ(sealed_segment->get_real_count(), 0);
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), timestamp));
}

TEST(Sealed, LoadFieldDataMmap) {
auto dim = 16;
auto topK = 5;
Expand Down
Loading

0 comments on commit 81f3550

Please sign in to comment.