Skip to content

Commit

Permalink
Compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Nov 8, 2024
1 parent ca4784f commit c5034bd
Show file tree
Hide file tree
Showing 21 changed files with 163 additions and 118 deletions.
6 changes: 3 additions & 3 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ class AsyncStore : public Store {
public:
AsyncStore(
std::shared_ptr<storage::Library> library,
const BlockCodec&&codec,
const BlockCodecImpl&& codec,
EncodingVersion encoding_version
) :
library_(std::move(library)),
codec_(std::make_shared<arcticdb::proto::encoding::VariantCodec>(codec)),
codec_(std::make_shared<BlockCodecImpl>(codec)),
encoding_version_(encoding_version) {
}

Expand Down Expand Up @@ -401,7 +401,7 @@ folly::Future<SliceAndKey> async_write(
private:
friend class arcticdb::toolbox::apy::LibraryTool;
std::shared_ptr<storage::Library> library_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_;
std::shared_ptr<BlockCodecImpl> codec_;
const EncodingVersion encoding_version_;
};

Expand Down
16 changes: 8 additions & 8 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ struct EncodeAtomTask : BaseTask {
PartialKey partial_key_;
timestamp creation_ts_;
SegmentInMemory segment_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta_;
std::shared_ptr<BlockCodecImpl> codec_meta_;
EncodingVersion encoding_version_;

EncodeAtomTask(
PartialKey &&pk,
timestamp creation_ts,
SegmentInMemory &&segment,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version) :
partial_key_(std::move(pk)),
creation_ts_(creation_ts),
Expand All @@ -59,7 +59,7 @@ struct EncodeAtomTask : BaseTask {
EncodeAtomTask(
std::pair<PartialKey, SegmentInMemory>&& pk_seg,
timestamp creation_ts,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version) :
partial_key_(std::move(pk_seg.first)),
creation_ts_(creation_ts),
Expand All @@ -76,7 +76,7 @@ struct EncodeAtomTask : BaseTask {
IndexValue end_index,
timestamp creation_ts,
SegmentInMemory &&segment,
const std::shared_ptr<arcticdb::proto::encoding::VariantCodec> &codec_meta,
const std::shared_ptr<BlockCodecImpl> &codec_meta,
EncodingVersion encoding_version) :
EncodeAtomTask(
PartialKey{key_type, gen_id, std::move(stream_id), std::move(start_index), std::move(end_index)},
Expand Down Expand Up @@ -106,12 +106,12 @@ struct EncodeAtomTask : BaseTask {
struct EncodeSegmentTask : BaseTask {
VariantKey key_;
SegmentInMemory segment_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta_;
std::shared_ptr<BlockCodecImpl> codec_meta_;
EncodingVersion encoding_version_;

EncodeSegmentTask(entity::VariantKey key,
SegmentInMemory &&segment,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version)
: key_(std::move(key)),
segment_(std::move(segment)),
Expand All @@ -136,14 +136,14 @@ struct EncodeRefTask : BaseTask {
KeyType key_type_;
StreamId id_;
SegmentInMemory segment_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta_;
std::shared_ptr<BlockCodecImpl> codec_meta_;
EncodingVersion encoding_version_;

EncodeRefTask(
KeyType key_type,
StreamId stream_id,
SegmentInMemory &&segment,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version
)
: key_type_(key_type),
Expand Down
32 changes: 15 additions & 17 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ TEST(Async, DeDupTest) {

as::UserAuth au{"abc"};
auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, au);
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();
aa::AsyncStore store(lib, *codec_opt, EncodingVersion::V2);
auto codec_opt = std::make_shared<BlockCodecImpl>();
aa::AsyncStore store(lib, codec::default_lz4_codec(), EncodingVersion::V2);
auto seg = SegmentInMemory();

std::vector<std::pair<ast::StreamSink::PartialKey, SegmentInMemory>> key_segments;
Expand Down Expand Up @@ -303,9 +303,9 @@ TEST(Async, NumCoresCgroupV2) {
std::shared_ptr<arcticdb::Store> create_store(const storage::LibraryPath &library_path,
as::LibraryIndex &library_index,
const storage::UserAuth &user_auth,
std::shared_ptr<proto::encoding::VariantCodec> &codec_opt) {
BlockCodecImpl&& codec_opt) {
auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, user_auth);
auto store = aa::AsyncStore(lib, *codec_opt, EncodingVersion::V1);
auto store = aa::AsyncStore(lib, std::move(codec_opt), EncodingVersion::V1);
return std::make_shared<aa::AsyncStore<>>(std::move(store));
}

Expand All @@ -327,23 +327,23 @@ TEST(Async, CopyCompressedInterStore) {
as::LibraryIndex library_index{environment_name, config_resolver};

as::UserAuth user_auth{"abc"};
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();

auto source_store = create_store(library_path, library_index, user_auth, codec_opt);
auto source_store = create_store(library_path, library_index, user_auth, codec::default_passthrough_codec());

// When - we write a key to the source and copy it
const arcticdb::entity::RefKey& key = arcticdb::entity::RefKey{"abc", KeyType::VERSION_REF};
auto segment_in_memory = get_test_frame<arcticdb::stream::TimeseriesIndex>("symbol", {}, 10, 0).segment_;
auto row_count = segment_in_memory.row_count();
ASSERT_GT(row_count, 0);
auto segment = encode_dispatch(std::move(segment_in_memory), *codec_opt, arcticdb::EncodingVersion::V1);
auto codec_opt = codec::default_passthrough_codec();
auto segment = encode_dispatch(std::move(segment_in_memory), codec_opt, arcticdb::EncodingVersion::V1);
(void)segment.calculate_size();
source_store->write_compressed_sync(as::KeySegmentPair{key, std::move(segment)});

auto targets = std::vector<std::shared_ptr<arcticdb::Store>>{
create_store(library_path, library_index, user_auth, codec_opt),
create_store(library_path, library_index, user_auth, codec_opt),
create_store(library_path, library_index, user_auth, codec_opt)
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec())
};

CopyCompressedInterStoreTask task{
Expand Down Expand Up @@ -394,25 +394,23 @@ TEST(Async, CopyCompressedInterStoreNoSuchKeyOnWrite) {
as::LibraryIndex failed_library_index{environment_name, failed_config_resolver};

as::UserAuth user_auth{"abc"};
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();

auto source_store = create_store(library_path, library_index, user_auth, codec_opt);
auto source_store = create_store(library_path, library_index, user_auth, codec::default_passthrough_codec());

std::string failureSymbol = storage::s3::MockS3Client::get_failure_trigger("sym", storage::StorageOperation::WRITE, Aws::S3::S3Errors::NO_SUCH_KEY);

// Prepare 2 targets to fail and 1 to succeed
auto targets = std::vector<std::shared_ptr<arcticdb::Store>>{
create_store(library_path, library_index, user_auth, codec_opt),
create_store(library_path, failed_library_index, user_auth, codec_opt),
create_store(library_path, library_index, user_auth, codec_opt)
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, failed_library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec())
};

// When - we write a key to the source
const arcticdb::entity::RefKey& key = arcticdb::entity::RefKey{failureSymbol, KeyType::VERSION_REF};
auto segment_in_memory = get_test_frame<arcticdb::stream::TimeseriesIndex>("symbol", {}, 10, 0).segment_;
auto row_count = segment_in_memory.row_count();
ASSERT_GT(row_count, 0);
auto segment = encode_dispatch(std::move(segment_in_memory), *codec_opt, arcticdb::EncodingVersion::V1);
auto segment = encode_dispatch(std::move(segment_in_memory), codec::default_passthrough_codec(), arcticdb::EncodingVersion::V1);
(void)segment.calculate_size();
source_store->write_compressed_sync(as::KeySegmentPair{key, std::move(segment)});

Expand Down
45 changes: 40 additions & 5 deletions cpp/arcticdb/codec/adaptive_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,31 @@
#include "util/hash.hpp"

namespace arcticdb::detail {

//Placeholder class for BtrBlocks encoding, just copies the data for now.

template<template<typename> class BlockType, class TD>
struct AdaptiveEncoderV1 {
using Opts = AdaptiveCodec;

static size_t max_compressed_size(const BlockType<TD> & ) {
util::raise_rte("Adaptive encoding not supported with V1 format");
}

template <typename EncodedFieldType>
static void encode(
const Opts&,
const BlockType<TD>&,
EncodedFieldType&,
Buffer&,
std::ptrdiff_t&) {
util::raise_rte("Adaptive encoding not supported with V1 format");
}
};

template<template<typename> class BlockType, class TD>
struct AdaptiveEncoder {
using Opts = arcticdb::proto::encoding::VariantCodec::Passthrough;
using Opts = AdaptiveCodec;

static size_t max_compressed_size(const BlockType<TD> &block) {
return block.nbytes();
Expand All @@ -17,11 +39,24 @@ struct AdaptiveEncoder {
template <typename EncodedBlockType>
static void encode(
const Opts&,
const BlockType<TD> &/*block*/,
Buffer &/*out*/,
std::ptrdiff_t &/*pos*/,
EncodedBlockType* /*encoded_block*/) {
const BlockType<TD> &block,
Buffer &out,
std::ptrdiff_t &pos,
EncodedBlockType* encoded_block) {
using namespace arcticdb::entity;
using CodecHelperType = CodecHelper<TD>;
using T = typename CodecHelperType::T;
CodecHelperType helper;
const T* d = block.data();
const size_t data_byte_size = block.nbytes();
helper.ensure_buffer(out, pos, data_byte_size);


T *t_out = out.ptr_cast<T>(pos, data_byte_size);
encode_block(d, data_byte_size, helper.hasher(), t_out, pos);
encoded_block->set_in_bytes(data_byte_size);
encoded_block->set_out_bytes(data_byte_size);
encoded_block->set_hash(helper.hasher().digest());
}
private:
template<class T>
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ struct ShapeEncodingFromBlock {
* HashAccum & hasher, T * out, std::size_t out_capacity, std::ptrdiff_t & pos,
* arcticdb::proto::encoding::VariantCodec & out_codec);
* static constexpr std::uint32_t VERSION;
* using Opts = VariantCodec::SomeCodecOptionType;
* using Opts = Codec::SomeCodecOptionType;
* }
* @tparam SE ShapeEncoding Same as VE but used for shape encoding. By default uses VE.
*
Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/codec/default_codecs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@

namespace arcticdb::codec {

inline BlockCodecImpl default_lz4_codec() {
constexpr inline BlockCodecImpl default_lz4_codec() {
BlockCodecImpl codec;
auto lz4ptr = codec.mutable_lz4();

Check failure on line 16 in cpp/arcticdb/codec/default_codecs.hpp

View workflow job for this annotation

GitHub Actions / code_coverage

call to non-‘constexpr’ function ‘arcticdb::Lz4Codec* arcticdb::BlockCodecImpl::mutable_lz4()’

Check failure on line 16 in cpp/arcticdb/codec/default_codecs.hpp

View workflow job for this annotation

GitHub Actions / code_coverage

call to non-‘constexpr’ function ‘arcticdb::Lz4Codec* arcticdb::BlockCodecImpl::mutable_lz4()’

Check failure on line 16 in cpp/arcticdb/codec/default_codecs.hpp

View workflow job for this annotation

GitHub Actions / code_coverage

call to non-‘constexpr’ function ‘arcticdb::Lz4Codec* arcticdb::BlockCodecImpl::mutable_lz4()’
lz4ptr->acceleration_ = 1;
return codec;
}

inline BlockCodecImpl default_passthrough_codec() {
constexpr inline BlockCodecImpl default_passthrough_codec() {
BlockCodecImpl codec;
(void)codec.mutable_passthrough();
codec.codec_ = Codec::PASS;
return codec;
}

Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/codec/encoded_field.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ struct BlockCodecImpl : public BlockCodec {
return &data_[0];
}

BlockCodecImpl() {
memset(data(), 0, DataSize);
}
BlockCodecImpl() = default;

ZstdCodec *mutable_zstd() {
codec_ = Codec::ZSTD;
Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/codec/lz4.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ namespace arcticdb::detail {

struct Lz4BlockEncoder {

using Opts = arcticdb::proto::encoding::VariantCodec::Lz4;
using Opts = Lz4Codec;
static constexpr std::uint32_t VERSION = 1;

static std::size_t max_compressed_size(std::size_t size) {
return LZ4_compressBound(static_cast<int>(size));
}

static void set_shape_defaults(Opts &opts) {
opts.set_acceleration(0);
opts.acceleration_ = 0;
}

template<class T, class CodecType>
Expand All @@ -53,7 +53,7 @@ struct Lz4BlockEncoder {
ARCTICDB_TRACE(log::storage(), "Block of size {} compressed to {} bytes", block_utils.bytes_, compressed_bytes);
hasher(in, block_utils.count_);
pos += ssize_t(compressed_bytes);
copy_codec(*out_codec.mutable_lz4(), opts);
*out_codec.mutable_lz4() = opts;
return std::size_t(compressed_bytes);
}
};
Expand Down
7 changes: 3 additions & 4 deletions cpp/arcticdb/codec/passthrough.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace arcticdb::detail {

template<template<typename> class BlockType, class TD>
struct PassthroughEncoderV1 {
using Opts = arcticdb::proto::encoding::VariantCodec::Passthrough;
using Opts = PassthroughCodec;

static size_t max_compressed_size(const BlockType<TD> &block ) {
using Helper = CodecHelper<TD>;
Expand Down Expand Up @@ -108,7 +108,7 @@ struct PassthroughEncoderV1 {
/// @see arcticdb::ColumnEncoder2 arcticdb::detail::GenericBlockEncoder2
template<template<typename> class BlockType, class TD>
struct PassthroughEncoderV2 {
using Opts = arcticdb::proto::encoding::VariantCodec::Passthrough;
using Opts = PassthroughCodec;

static size_t max_compressed_size(const BlockType<TD> &block) {
return block.nbytes();
Expand All @@ -129,8 +129,7 @@ struct PassthroughEncoderV2 {
const size_t data_byte_size = block.nbytes();
helper.ensure_buffer(out, pos, data_byte_size);

// doing copy + hash in one pass, this might have a negative effect on perf
// since the hashing is path dependent. This is a toy example though so not critical

T *t_out = out.ptr_cast<T>(pos, data_byte_size);
encode_block(d, data_byte_size, helper.hasher(), t_out, pos);
encoded_block->set_in_bytes(data_byte_size);
Expand Down
24 changes: 14 additions & 10 deletions cpp/arcticdb/codec/protobuf_mappings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,34 @@

namespace arcticdb {

void block_from_proto(const arcticdb::proto::encoding::Block& input, EncodedBlock& output, bool is_shape) {
output.set_in_bytes(input.in_bytes());
output.set_out_bytes(input.out_bytes());
output.set_hash(input.hash());
output.set_encoder_version(static_cast<uint16_t>(input.encoder_version()));
output.is_shape_ = is_shape;
switch (input.codec().codec_case()) {
void codec_from_proto(const arcticdb::proto::encoding::VariantCodec& input_codec, BlockCodecImpl& output_codec) {
switch (input_codec.codec_case()) {
case arcticdb::proto::encoding::VariantCodec::kZstd: {
set_codec(input.codec().zstd(), *output.mutable_codec()->mutable_zstd());
set_codec(input_codec.zstd(), *output_codec.mutable_zstd());
break;
}
case arcticdb::proto::encoding::VariantCodec::kLz4: {
set_codec(input.codec().lz4(), *output.mutable_codec()->mutable_lz4());
set_codec(input_codec.lz4(), *output_codec.mutable_lz4());
break;
}
case arcticdb::proto::encoding::VariantCodec::kPassthrough : {
set_codec(input.codec().passthrough(), *output.mutable_codec()->mutable_passthrough());
set_codec(input_codec.passthrough(), *output_codec.mutable_passthrough());
break;
}
default:
util::raise_rte("Unrecognized_codec");
}
}

void block_from_proto(const arcticdb::proto::encoding::Block& input, EncodedBlock& output, bool is_shape) {
output.set_in_bytes(input.in_bytes());
output.set_out_bytes(input.out_bytes());
output.set_hash(input.hash());
output.set_encoder_version(static_cast<uint16_t>(input.encoder_version()));
output.is_shape_ = is_shape;
codec_from_proto(input.codec(), *output.mutable_codec());
}

void proto_from_block(const EncodedBlock& input, arcticdb::proto::encoding::Block& output) {
output.set_in_bytes(input.in_bytes());
output.set_out_bytes(input.out_bytes());
Expand Down
Loading

0 comments on commit c5034bd

Please sign in to comment.