Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: offline map type #3746

Merged
merged 3 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions hybridse/include/base/fe_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

#ifndef HYBRIDSE_INCLUDE_BASE_FE_SLICE_H_
#define HYBRIDSE_INCLUDE_BASE_FE_SLICE_H_

#include <assert.h>
#include <memory.h>
#include <stddef.h>
#include <string.h>
#include <memory>

#include <string>

#include "base/raw_buffer.h"
#include "boost/smart_ptr/local_shared_ptr.hpp"

namespace hybridse {
namespace base {
Expand Down
3 changes: 1 addition & 2 deletions hybridse/include/base/raw_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

#ifndef HYBRIDSE_INCLUDE_BASE_RAW_BUFFER_H_
#define HYBRIDSE_INCLUDE_BASE_RAW_BUFFER_H_
#include <assert.h>

#include <stddef.h>
#include <string.h>
#include <string>

#include "glog/logging.h"

Expand Down
63 changes: 63 additions & 0 deletions hybridse/include/codec/fe_row_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,30 @@
#define HYBRIDSE_INCLUDE_CODEC_FE_ROW_CODEC_H_

#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "absl/status/statusor.h"
#include "base/fe_status.h"
#include "base/fe_slice.h"
#include "base/raw_buffer.h"
#include "codec/row.h"
#include "proto/fe_type.pb.h"

namespace hybridse {

namespace node {
class ExprNode;
}
namespace codegen {
class InsertRowBuilder;
}
namespace vm {
class HybridSeJitWrapper;
}

namespace codec {

const uint32_t BitMapSize(uint32_t size);
Expand Down Expand Up @@ -74,6 +89,54 @@
void FillNullStringOffset(int8_t* buf, uint32_t start, uint32_t addr_length,
uint32_t str_idx, uint32_t str_offset);


// single slice builder from pure codegen
class SliceBuilder {
public:
SliceBuilder(vm::HybridSeJitWrapper*, const hybridse::codec::Schema* schema);
virtual ~SliceBuilder() {}

Check warning on line 97 in hybridse/include/codec/fe_row_codec.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/codec/fe_row_codec.h#L97

Added line #L97 was not covered by tests

base::Status Build(const std::vector<node::ExprNode*>&, base::RefCountedSlice*);

base::Status Build(absl::Span<node::ExprNode* const>, base::RefCountedSlice*);

private:
void EnsureInitialized() {
assert(row_builder_ != nullptr && "must initialize the row builder before encoding");
}

Check warning on line 106 in hybridse/include/codec/fe_row_codec.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/codec/fe_row_codec.h#L104-L106

Added lines #L104 - L106 were not covered by tests

const Schema* schema_;
std::shared_ptr<codegen::InsertRowBuilder> row_builder_ = nullptr;
};

// new row builder from pure codegen
class RowBuilder2 {
public:
RowBuilder2(vm::HybridSeJitWrapper*, int sliceSize);
RowBuilder2(vm::HybridSeJitWrapper*, const std::vector<codec::Schema>& schemas);
RowBuilder2(vm::HybridSeJitWrapper*, const std::vector<std::vector<hybridse::type::ColumnDef>>& schemas);
~RowBuilder2() {}

base::Status Init();

base::Status InitSchema(int idx, const codec::Schema& sc);

base::Status Build(const std::vector<node::ExprNode*>&, codec::Row*);

private:
void EnsureInitialized() {
assert(initialized_ && "RowBuild not initialized");
}

Check warning on line 129 in hybridse/include/codec/fe_row_codec.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/codec/fe_row_codec.h#L127-L129

Added lines #L127 - L129 were not covered by tests

vm::HybridSeJitWrapper* jit_ = nullptr;
std::vector<codec::Schema> schemas_;
std::vector<std::shared_ptr<SliceBuilder>> builders_;

bool initialized_ = false;
};

// Old row builder in C
// limited data type support, no map, no array. U should upgrade to RowBuilder2
class RowBuilder {
public:
explicit RowBuilder(const hybridse::codec::Schema& schema);
Expand Down
5 changes: 0 additions & 5 deletions hybridse/include/codec/row.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@
#define HYBRIDSE_INCLUDE_CODEC_ROW_H_

#include <cstdint>
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "base/fe_slice.h"
#include "base/raw_buffer.h"
#include "proto/fe_type.pb.h"

namespace hybridse {
namespace codec {
Expand Down
98 changes: 98 additions & 0 deletions hybridse/src/codec/fe_row_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

#include "codec/type_codec.h"
#include "gflags/gflags.h"
#include "codegen/insert_row_builder.h"
#include "glog/logging.h"
#include "proto/fe_common.pb.h"
#include "vm/engine.h"

DECLARE_bool(enable_spark_unsaferow_format);

Expand Down Expand Up @@ -1033,5 +1036,100 @@
str_field_start_offset_);
}

SliceBuilder::SliceBuilder(vm::HybridSeJitWrapper* jit, const hybridse::codec::Schema* schema)
: schema_(schema) {
row_builder_ = std::make_shared<codegen::InsertRowBuilder>(jit, schema_);
}

Check warning on line 1042 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1039-L1042

Added lines #L1039 - L1042 were not covered by tests

base::Status SliceBuilder::Build(const std::vector<node::ExprNode* >& values, base::RefCountedSlice* slice) {
return Build(absl::MakeSpan(values), slice);

Check warning on line 1045 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1044-L1045

Added lines #L1044 - L1045 were not covered by tests
}

base::Status SliceBuilder::Build(absl::Span<node::ExprNode* const> values, base::RefCountedSlice* slice) {
EnsureInitialized();

Check warning on line 1049 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1048-L1049

Added lines #L1048 - L1049 were not covered by tests

auto rs = row_builder_->ComputeRowUnsafe(values);
if (!rs.ok()) {
return {common::kCodegenEncodeError, rs.status().ToString()};

Check warning on line 1053 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1051-L1053

Added lines #L1051 - L1053 were not covered by tests
}

auto buf = rs.value();
if (buf == nullptr) {
return {common::kCodegenEncodeError, "internal error: encoded buf is null"};

Check warning on line 1058 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1056-L1058

Added lines #L1056 - L1058 were not covered by tests
}

*slice = base::RefCountedSlice::CreateManaged(buf, RowView::GetSize(buf));

Check warning on line 1061 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1061

Added line #L1061 was not covered by tests

return {};

Check warning on line 1063 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1063

Added line #L1063 was not covered by tests
}

RowBuilder2::RowBuilder2(vm::HybridSeJitWrapper* jit, int sliceSize) : jit_(jit) {
schemas_.resize(sliceSize);
builders_.resize(sliceSize);
}
RowBuilder2::RowBuilder2(vm::HybridSeJitWrapper* jit, const std::vector<codec::Schema>& schemas)
: jit_(jit), schemas_(schemas) {
builders_.resize(schemas_.size());
}
RowBuilder2::RowBuilder2(vm::HybridSeJitWrapper* jit,
const std::vector<std::vector<hybridse::type::ColumnDef>>& schemas)
: jit_(jit) {
for (auto& sc : schemas) {
schemas_.push_back(Schema());
auto& ref = schemas_.back();
for (auto& col : sc) {
ref.Add()->CopyFrom(col);

Check warning on line 1081 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1066-L1081

Added lines #L1066 - L1081 were not covered by tests
}
}
builders_.resize(schemas_.size());
}

Check warning on line 1085 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1084-L1085

Added lines #L1084 - L1085 were not covered by tests

base::Status RowBuilder2::Init() {
CHECK_TRUE(jit_ != nullptr, common::kCodegenEncodeError, "jit is null");
for (size_t i = 0; i < schemas_.size(); ++i) {
CHECK_TRUE(!schemas_[i].empty(), common::kCodegenEncodeError, absl::StrCat(i, "th schema un-initialized"));
if (builders_[i] == nullptr) {
builders_[i] = std::make_shared<SliceBuilder>(jit_, &schemas_[i]);

Check warning on line 1092 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1087-L1092

Added lines #L1087 - L1092 were not covered by tests
}
}

initialized_ = true;
return {};

Check warning on line 1097 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1096-L1097

Added lines #L1096 - L1097 were not covered by tests
}

base::Status RowBuilder2::Build(const std::vector<node::ExprNode*>& values, codec::Row* out) {
EnsureInitialized();

Check warning on line 1101 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1100-L1101

Added lines #L1100 - L1101 were not covered by tests

auto expect_cols =
std::accumulate(schemas_.begin(), schemas_.end(), 0, [](int val, const auto& e) { return val + e.size(); });
CHECK_TRUE(values.size() == expect_cols, common::kCodegenEncodeError, "pass in expr number do not match, expect ",

Check warning on line 1105 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1104-L1105

Added lines #L1104 - L1105 were not covered by tests
expect_cols, " but got ", values.size());

int col_idx = 0;
Row row;
auto values_ref = absl::MakeSpan(values);
for (size_t i = 0; i < schemas_.size(); ++i) {
RefCountedSlice slice;
CHECK_STATUS(builders_[i]->Build(values_ref.subspan(col_idx, schemas_[i].size()), &slice));
if (i == 0) {
row.Reset(slice);

Check warning on line 1115 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1108-L1115

Added lines #L1108 - L1115 were not covered by tests
} else {
row.Append(slice);

Check warning on line 1117 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1117

Added line #L1117 was not covered by tests
}

col_idx += schemas_[i].size();

Check warning on line 1120 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1120

Added line #L1120 was not covered by tests
}

*out = row;

Check warning on line 1123 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1123

Added line #L1123 was not covered by tests

return {};

Check warning on line 1125 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1125

Added line #L1125 was not covered by tests
}
base::Status RowBuilder2::InitSchema(int idx, const codec::Schema& sc) {
if (idx >= schemas_.size()) {
return {common::kCodegenEncodeError, "idx out of bound"};

Check warning on line 1129 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1127-L1129

Added lines #L1127 - L1129 were not covered by tests
}
schemas_[idx] = sc;
return {};

Check warning on line 1132 in hybridse/src/codec/fe_row_codec.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/codec/fe_row_codec.cc#L1131-L1132

Added lines #L1131 - L1132 were not covered by tests
}
} // namespace codec
} // namespace hybridse
3 changes: 1 addition & 2 deletions hybridse/src/codegen/aggregate_ir_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
std::vector<std::pair<size_t, NativeValue>> outputs;
agg_generator.GenOutputs(&builder, &outputs);
for (auto pair : outputs) {
output_encoder.BuildEncodePrimaryField(output_arg, pair.first,
pair.second);
CHECK_STATUS(output_encoder.BuildEncodePrimaryField(output_arg, pair.first, pair.second));
}
}
builder.CreateRetVoid();
Expand Down
3 changes: 3 additions & 0 deletions hybridse/src/codegen/buf_ir_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ class BufNativeEncoderIRBuilder : public RowEncodeIRBuilder {

~BufNativeEncoderIRBuilder() override;

ABSL_MUST_USE_RESULT
base::Status Init() noexcept;

// the output_ptr like int8_t**
ABSL_MUST_USE_RESULT
base::Status BuildEncode(::llvm::Value* output_ptr) override;

ABSL_MUST_USE_RESULT
base::Status BuildEncodePrimaryField(::llvm::Value* buf, size_t idx, const NativeValue& val);

private:
Expand Down
3 changes: 0 additions & 3 deletions hybridse/src/codegen/buf_ir_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ void RunEncode(::hybridse::type::TableDef& table, // NOLINT
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto load_fn_jit = jit->FindFunction("fn");
void (*decode)(int8_t**) =
Expand Down Expand Up @@ -307,7 +306,6 @@ void LoadValue(T* result, bool* is_null,
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto load_fn_jit = jit->FindFunction("fn");

Expand Down Expand Up @@ -438,7 +436,6 @@ void RunColCase(T expected, type::TableDef& table, // NOLINT
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
jit->AddExternalFunction("print_list_i16",
reinterpret_cast<void*>(&PrintListInt16));
Expand Down
1 change: 0 additions & 1 deletion hybridse/src/codegen/fn_ir_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void CheckResult(node::FnNodeFnDef *fn_def, R exp, V1 a, V2 b) {
m->print(::llvm::errs(), NULL, true, true);
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto test_fn = (R(*)(V1, V2))jit->FindFunction(fn_def->header_->GeIRFunctionName());
R result = test_fn(a, b);
Expand Down
1 change: 0 additions & 1 deletion hybridse/src/codegen/fn_let_ir_builder_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ void CheckFnLetBuilderWithParameterRow(::hybridse::node::NodeManager* manager, v
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());

ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto address = jit->FindFunction("test_at_fn");
Expand Down
Loading
Loading