Skip to content

Commit

Permalink
feat: support json index
Browse files Browse the repository at this point in the history
This PR adds json index support for json and dynamic fields. Now you can only do unary query like 'a["b"] > 1' using this index. We will support more filter type later.

basic usage:
```
collection.create_index("json_field", {"index_type": "INVERTED",
    "params": {"json_cast_type": DataType.STRING, "json_path":
'json_field["a"]["b"]'}})
```

There are some limits to use this index:
1. If a record does not have the json path you specify, it will be ignored and there will not be an error.
2. If a value of the json path fails to be cast to the type you specify,  it will be ignored and there will not be an error.
3. A specific json path can have only one json index.
4. If you try to create more than one json indexes for one json field, sdk(pymilvus<=2.4.7) may return immediately because of internal implementation. This will be fixed in a later version.

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Nov 5, 2024
1 parent b83b376 commit f87196f
Show file tree
Hide file tree
Showing 43 changed files with 962 additions and 124 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ INSTALL_PATH := $(PWD)/bin
LIBRARY_PATH := $(PWD)/lib
PGO_PATH := $(PWD)/configs/pgo
OS := $(shell uname -s)
mode = Release
mode = Debug

use_disk_index = OFF
ifdef disk_index
Expand Down
14 changes: 14 additions & 0 deletions internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,20 @@ class FieldDataJsonImpl : public FieldDataImpl<Json, true> {
}
length_ += n;
}

// only for test
void
add_json_data(const std::vector<Json>& json) {
std::lock_guard lck(tell_mutex_);
if (length_ + json.size() > get_num_rows()) {
resize_field_data(length_ + json.size());
}

for (size_t i = 0; i < json.size(); ++i) {
data_[length_ + i] = json[i];
}
length_ += json.size();
}
};

class FieldDataSparseVectorImpl
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ class FieldMeta {
return IsVectorDataType(type_);
}

bool
is_json() const {
return type_ == DataType::JSON;
}

bool
is_string() const {
return IsStringDataType(type_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/BinaryRangeExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/ExistsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PhyExistsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
43 changes: 36 additions & 7 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
#include <memory>
#include <string>

#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Types.h"
#include "exec/expression/EvalCtx.h"
#include "exec/expression/VectorFunction.h"
#include "exec/expression/Utils.h"
#include "exec/QueryContext.h"
#include "expr/ITypeExpr.h"
#include "log/Log.h"
#include "query/PlanProto.h"

namespace milvus {
Expand Down Expand Up @@ -93,12 +96,15 @@ class SegmentExpr : public Expr {
SegmentExpr(const std::vector<ExprPtr>&& input,
const std::string& name,
const segcore::SegmentInternalInterface* segment,
const FieldId& field_id,
const FieldId field_id,
const std::vector<std::string> nested_path,
int64_t active_count,
int64_t batch_size)
: Expr(DataType::BOOL, std::move(input), name),
segment_(segment),
field_id_(field_id),
nested_path_(nested_path),

active_count_(active_count),
batch_size_(batch_size) {
size_per_chunk_ = segment_->size_per_chunk();
Expand All @@ -113,6 +119,7 @@ class SegmentExpr : public Expr {
InitSegmentExpr() {
auto& schema = segment_->get_schema();
auto& field_meta = schema[field_id_];
field_type_ = field_meta.get_data_type();

if (schema.get_primary_field_id().has_value() &&
schema.get_primary_field_id().value() == field_id_ &&
Expand All @@ -121,9 +128,16 @@ class SegmentExpr : public Expr {
pk_type_ = field_meta.get_data_type();
}

is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
if (field_meta.get_data_type() == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);
if (is_index_mode_ = segment_->HasIndex(field_id_, pointer)) {
num_index_chunk_ = 1;
}
} else {
is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
}
}
// if index not include raw data, also need load data
if (segment_->HasFieldData(field_id_)) {
Expand Down Expand Up @@ -460,9 +474,21 @@ class SegmentExpr : public Expr {
// It avoids indexing execute for every batch because indexing
// executing costs quite much time.
if (cached_index_chunk_id_ != i) {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
Index* index_ptr = nullptr;

if (field_type_ == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);

const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(
field_id_, pointer, i);
index_ptr = const_cast<Index*>(&index);
} else {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_,
i);
index_ptr = const_cast<Index*>(&index);
}
cached_index_chunk_res_ = std::move(func(index_ptr, values...));
auto valid_result = index_ptr->IsNotNull();
cached_index_chunk_valid_res_ = std::move(valid_result);
Expand Down Expand Up @@ -715,6 +741,9 @@ class SegmentExpr : public Expr {
DataType pk_type_;
int64_t batch_size_;

std::vector<std::string> nested_path_;
DataType field_type_;

bool is_index_mode_{false};
bool is_data_mode_{false};
// sometimes need to skip index and using raw data
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/JsonContainsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/TermExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class PhyTermFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr),
Expand Down
72 changes: 53 additions & 19 deletions internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "UnaryExpr.h"
#include <optional>
#include "common/Json.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "log/Log.h"

namespace milvus {
namespace exec {
Expand Down Expand Up @@ -188,25 +191,49 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
}
case DataType::JSON: {
auto val_type = expr_->val_.val_case();
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplJson<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplJson<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplJson<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplJson<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
if (CanUseIndexForJson()) {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplForIndex<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplForIndex<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplForIndex<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplForIndex<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result =
ExecRangeVisitorImplForIndex<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
} else {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplJson<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplJson<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplJson<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplJson<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
}
break;
}
Expand Down Expand Up @@ -912,6 +939,13 @@ PhyUnaryRangeFilterExpr::CanUseIndex() {
return res;
}

bool
PhyUnaryRangeFilterExpr::CanUseIndexForJson() {
use_index_ = segment_->HasIndex(
field_id_, milvus::Json::pointer(expr_->column_.nested_path_));
return use_index_;
}

VectorPtr
PhyUnaryRangeFilterExpr::ExecTextMatch() {
using Index = index::TextMatchIndex;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/exec/expression/UnaryExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down Expand Up @@ -341,6 +342,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
bool
CanUseIndexForArray();

bool
CanUseIndexForJson();

VectorPtr
ExecTextMatch();

Expand Down
55 changes: 53 additions & 2 deletions internal/core/src/index/IndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
// limitations under the License.

#include "index/IndexFactory.h"
#include <cstdlib>
#include <memory>
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"
#include "index/VectorMemIndex.h"
#include "index/Utils.h"
#include "index/Meta.h"
#include "indexbuilder/JsonInvertedIndexCreator.h"
#include "knowhere/utils.h"

#include "index/VectorDiskIndex.h"
Expand All @@ -29,6 +33,8 @@
#include "index/InvertedIndexTantivy.h"
#include "index/HybridScalarIndex.h"
#include "knowhere/comp/knowhere_check.h"
#include "log/Log.h"
#include "pb/schema.pb.h"

namespace milvus::index {

Expand Down Expand Up @@ -357,6 +363,49 @@ IndexFactory::CreateComplexScalarIndex(
PanicInfo(Unsupported, "Complex index not supported now");
}

IndexBasePtr
IndexFactory::CreateJsonIndex(
IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context) {
AssertInfo(index_type == INVERTED_INDEX_TYPE,
"Invalid index type for json index");
switch (cast_dtype) {
case DataType::BOOL:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<bool>>(
proto::schema::DataType::Bool,
nested_path,
file_manager_context);
case milvus::DataType::INT8:
case milvus::DataType::INT16:
case milvus::DataType::INT32:
case DataType::INT64:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<int64_t>>(
proto::schema::DataType::Int64,
nested_path,
file_manager_context);
case DataType::FLOAT:
case DataType::DOUBLE:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<double>>(
proto::schema::DataType::Double,
nested_path,
file_manager_context);
case DataType::STRING:
case DataType::VARCHAR:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<std::string>>(
proto::schema::DataType::String,
nested_path,
file_manager_context);
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype);
}
}

IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
Expand All @@ -379,8 +428,10 @@ IndexFactory::CreateScalarIndex(
file_manager_context);
}
case DataType::JSON: {
return CreateComplexScalarIndex(create_index_info.index_type,
file_manager_context);
return CreateJsonIndex(create_index_info.index_type,
create_index_info.json_cast_type,
create_index_info.json_path,
file_manager_context);
}
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", data_type);
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/index/IndexFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <mutex>
#include <shared_mutex>

#include "common/Types.h"
#include "common/type_c.h"
#include "index/Index.h"
#include "index/ScalarIndex.h"
Expand Down Expand Up @@ -103,6 +104,13 @@ class IndexFactory {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

IndexBasePtr
CreateJsonIndex(IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

IndexBasePtr
CreateScalarIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context =
Expand Down
Loading

0 comments on commit f87196f

Please sign in to comment.