Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support json index #36750

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ localStorage:
# Local path to where vector data are stored during a search or a query to avoid repetitve access to MinIO or S3 service.
# Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data.
# It is recommended to change this parameter before starting Milvus for the first time.
path: /var/lib/milvus/data/
path: /tmp/milvus/data/

# Related configuration of MinIO/S3/GCS or any other service supports S3 API, which is responsible for data persistence for Milvus.
# We refer to the storage service as MinIO/S3 in the following description for simplicity.
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/common/Consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ const int64_t DEFAULT_BITMAP_INDEX_BUILD_MODE_BOUND = 500;
const int64_t DEFAULT_HYBRID_INDEX_BITMAP_CARDINALITY_LIMIT = 100;

const size_t MARISA_NULL_KEY_ID = -1;

const std::string JSON_CAST_TYPE = "json_cast_type";
const std::string JSON_PATH = "json_path";
14 changes: 14 additions & 0 deletions internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,20 @@ class FieldDataJsonImpl : public FieldDataImpl<Json, true> {
}
length_ += n;
}

// only for test
void
add_json_data(const std::vector<Json>& json) {
std::lock_guard lck(tell_mutex_);
if (length_ + json.size() > get_num_rows()) {
resize_field_data(length_ + json.size());
}

for (size_t i = 0; i < json.size(); ++i) {
data_[length_ + i] = json[i];
}
length_ += json.size();
}
};

class FieldDataSparseVectorImpl
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ class FieldMeta {
return IsVectorDataType(type_);
}

bool
is_json() const {
return type_ == DataType::JSON;
}

bool
is_string() const {
return IsStringDataType(type_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/BinaryRangeExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/ExistsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PhyExistsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
43 changes: 36 additions & 7 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
#include <memory>
#include <string>

#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Types.h"
#include "exec/expression/EvalCtx.h"
#include "exec/expression/VectorFunction.h"
#include "exec/expression/Utils.h"
#include "exec/QueryContext.h"
#include "expr/ITypeExpr.h"
#include "log/Log.h"
#include "query/PlanProto.h"

namespace milvus {
Expand Down Expand Up @@ -93,12 +96,15 @@ class SegmentExpr : public Expr {
SegmentExpr(const std::vector<ExprPtr>&& input,
const std::string& name,
const segcore::SegmentInternalInterface* segment,
const FieldId& field_id,
const FieldId field_id,
const std::vector<std::string> nested_path,
int64_t active_count,
int64_t batch_size)
: Expr(DataType::BOOL, std::move(input), name),
segment_(segment),
field_id_(field_id),
nested_path_(nested_path),

active_count_(active_count),
batch_size_(batch_size) {
size_per_chunk_ = segment_->size_per_chunk();
Expand All @@ -113,6 +119,7 @@ class SegmentExpr : public Expr {
InitSegmentExpr() {
auto& schema = segment_->get_schema();
auto& field_meta = schema[field_id_];
field_type_ = field_meta.get_data_type();

if (schema.get_primary_field_id().has_value() &&
schema.get_primary_field_id().value() == field_id_ &&
Expand All @@ -121,9 +128,16 @@ class SegmentExpr : public Expr {
pk_type_ = field_meta.get_data_type();
}

is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
if (field_meta.get_data_type() == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);
if (is_index_mode_ = segment_->HasIndex(field_id_, pointer)) {
num_index_chunk_ = 1;
}
} else {
is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
}
}
// if index not include raw data, also need load data
if (segment_->HasFieldData(field_id_)) {
Expand Down Expand Up @@ -460,9 +474,21 @@ class SegmentExpr : public Expr {
// It avoids indexing execute for every batch because indexing
// executing costs quite much time.
if (cached_index_chunk_id_ != i) {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
Index* index_ptr = nullptr;

if (field_type_ == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);

const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(
field_id_, pointer, i);
index_ptr = const_cast<Index*>(&index);
} else {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_,
i);
index_ptr = const_cast<Index*>(&index);
}
cached_index_chunk_res_ = std::move(func(index_ptr, values...));
auto valid_result = index_ptr->IsNotNull();
cached_index_chunk_valid_res_ = std::move(valid_result);
Expand Down Expand Up @@ -715,6 +741,9 @@ class SegmentExpr : public Expr {
DataType pk_type_;
int64_t batch_size_;

std::vector<std::string> nested_path_;
DataType field_type_;

bool is_index_mode_{false};
bool is_data_mode_{false};
// sometimes need to skip index and using raw data
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/JsonContainsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/TermExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class PhyTermFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr),
Expand Down
72 changes: 53 additions & 19 deletions internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "UnaryExpr.h"
#include <optional>
#include "common/Json.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "log/Log.h"

namespace milvus {
namespace exec {
Expand Down Expand Up @@ -188,25 +191,49 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
}
case DataType::JSON: {
auto val_type = expr_->val_.val_case();
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplJson<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplJson<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplJson<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplJson<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
if (CanUseIndexForJson()) {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplForIndex<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplForIndex<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplForIndex<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplForIndex<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result =
ExecRangeVisitorImplForIndex<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
} else {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplJson<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplJson<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplJson<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplJson<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
}
break;
}
Expand Down Expand Up @@ -912,6 +939,13 @@ PhyUnaryRangeFilterExpr::CanUseIndex() {
return res;
}

bool
PhyUnaryRangeFilterExpr::CanUseIndexForJson() {
use_index_ = segment_->HasIndex(
field_id_, milvus::Json::pointer(expr_->column_.nested_path_));
return use_index_;
}

VectorPtr
PhyUnaryRangeFilterExpr::ExecTextMatch() {
using Index = index::TextMatchIndex;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/exec/expression/UnaryExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down Expand Up @@ -341,6 +342,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
bool
CanUseIndexForArray();

bool
CanUseIndexForJson();

VectorPtr
ExecTextMatch();

Expand Down
55 changes: 53 additions & 2 deletions internal/core/src/index/IndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
// limitations under the License.

#include "index/IndexFactory.h"
#include <cstdlib>
#include <memory>
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"
#include "index/VectorMemIndex.h"
#include "index/Utils.h"
#include "index/Meta.h"
#include "indexbuilder/JsonInvertedIndexCreator.h"
#include "knowhere/utils.h"

#include "index/VectorDiskIndex.h"
Expand All @@ -29,6 +33,8 @@
#include "index/InvertedIndexTantivy.h"
#include "index/HybridScalarIndex.h"
#include "knowhere/comp/knowhere_check.h"
#include "log/Log.h"
#include "pb/schema.pb.h"

namespace milvus::index {

Expand Down Expand Up @@ -357,6 +363,49 @@ IndexFactory::CreateComplexScalarIndex(
PanicInfo(Unsupported, "Complex index not supported now");
}

IndexBasePtr
IndexFactory::CreateJsonIndex(
IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context) {
AssertInfo(index_type == INVERTED_INDEX_TYPE,
"Invalid index type for json index");
switch (cast_dtype) {
case DataType::BOOL:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<bool>>(
proto::schema::DataType::Bool,
nested_path,
file_manager_context);
case milvus::DataType::INT8:
case milvus::DataType::INT16:
case milvus::DataType::INT32:
case DataType::INT64:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<int64_t>>(
proto::schema::DataType::Int64,
nested_path,
file_manager_context);
case DataType::FLOAT:
case DataType::DOUBLE:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<double>>(
proto::schema::DataType::Double,
nested_path,
file_manager_context);
case DataType::STRING:
case DataType::VARCHAR:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<std::string>>(
proto::schema::DataType::String,
nested_path,
file_manager_context);
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype);
}
}

IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
Expand All @@ -379,8 +428,10 @@ IndexFactory::CreateScalarIndex(
file_manager_context);
}
case DataType::JSON: {
return CreateComplexScalarIndex(create_index_info.index_type,
file_manager_context);
return CreateJsonIndex(create_index_info.index_type,
create_index_info.json_cast_type,
create_index_info.json_path,
file_manager_context);
}
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", data_type);
Expand Down
Loading
Loading