Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support horaedb submit compaction task to remote #1563

Open
wants to merge 56 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c1729f4
feat: add compaction server supporting remote compaction service
LeslieKid Jul 18, 2024
41f166b
fix style.
LeslieKid Jul 21, 2024
51666ca
fix style.
LeslieKid Jul 22, 2024
3d007e1
define error for compaction service.
LeslieKid Jul 23, 2024
453b22c
enable conversation from request to task.
LeslieKid Jul 23, 2024
b549119
update remote compact execution.
LeslieKid Jul 23, 2024
ec60fdc
enable conversation from task result to response.
LeslieKid Jul 25, 2024
d3b4db2
introduce CompactionCluster for compaction server in distribute mode.
LeslieKid Aug 5, 2024
798dd41
enable compaction cluster deployment.
LeslieKid Aug 6, 2024
cd7c9ae
refactor: replace CompactionCluster with ClusterType.
LeslieKid Aug 7, 2024
818d61f
remove compaction cluster,
LeslieKid Aug 7, 2024
31a306c
fix style and comment.
LeslieKid Aug 7, 2024
29894f8
provide cluster type for communication between horaedb/cs (as client)…
LeslieKid Aug 7, 2024
12f135a
introduce compaction client for horaedb to access remote compaction n…
LeslieKid Aug 12, 2024
a999f41
impl compact in Cluster trait.
LeslieKid Aug 14, 2024
165c18d
fix style and add comment.
LeslieKid Aug 14, 2024
f306c14
impl type conversation.
LeslieKid Aug 16, 2024
c01ef00
remove dead code.
LeslieKid Aug 16, 2024
e170f65
Merge branch 'main' into remote-compaction-service
LeslieKid Aug 16, 2024
f2c0e38
remove cluster type in meta client.
LeslieKid Aug 16, 2024
421d2c0
Merge branch 'main' into remote-compaction-service
LeslieKid Aug 16, 2024
c6521a4
fix bug
LeslieKid Aug 16, 2024
723a533
update Cargo.lock
LeslieKid Aug 16, 2024
3da8fe5
fix style.
LeslieKid Aug 20, 2024
debabab
rename ClusterType to NodeType.
LeslieKid Aug 20, 2024
03af564
fix style.
LeslieKid Aug 21, 2024
16d1f3d
impl default.
LeslieKid Aug 26, 2024
32a53af
support meta client to fetch compaction node and send heartbeat with …
LeslieKid Aug 27, 2024
56fe903
feat: analytic engine can support compaction offload
LeslieKid Aug 30, 2024
8eff59d
support conversation from compaction runner task to execute compactio…
LeslieKid Aug 31, 2024
142d8dc
update Cargo.toml (tmp)
LeslieKid Aug 31, 2024
4b72e32
fix bug.
LeslieKid Sep 2, 2024
b01a4b0
update docs for cluster deployment.
LeslieKid Sep 2, 2024
e78dc84
impl RemoteCompactionRunner.
LeslieKid Sep 3, 2024
1c98627
refactor: place remote compact logic into RemoteCompactionRunner inst…
LeslieKid Sep 10, 2024
20bfc80
Merge branch 'main' into remote-compactor
LeslieKid Sep 10, 2024
cdacd6d
Merge branch 'main' into remote-compactor
LeslieKid Sep 12, 2024
282d886
fix bug.
LeslieKid Sep 20, 2024
efe99bd
remove unnecessary NodeType in Cluster.
LeslieKid Sep 18, 2024
b553a85
Merge branch 'main' into remote-compactor
LeslieKid Sep 20, 2024
b7b810d
impl local compaction node picker.
LeslieKid Sep 22, 2024
f31c2c8
Merge branch 'remote-compactor' of github.com:LeslieKid/horaedb into …
LeslieKid Sep 22, 2024
5cf33b2
use compaction mode instead of compaction offload for analytic engine…
LeslieKid Sep 23, 2024
bf3b67c
remove unnecessary NodeType.
LeslieKid Sep 24, 2024
1beaa16
add serde tag for compaction mode.
LeslieKid Sep 24, 2024
158a529
fix style.
LeslieKid Sep 25, 2024
86cebbb
delete part of docs for cluster config.
LeslieKid Sep 27, 2024
317d248
impl simple error handle for remote compaction runner.
LeslieKid Sep 27, 2024
708cf5c
Merge branch 'main' into remote-compactor
LeslieKid Sep 28, 2024
7efa88c
move compaction client to compaction runner crate.
Rachelint Sep 30, 2024
2f36208
sort out codes for compation service.
Rachelint Sep 30, 2024
d03c4c5
redirect horaedbproto to apache repo.
Rachelint Sep 30, 2024
0e6fe29
remove unnecessary field in compaction service.
LeslieKid Oct 1, 2024
0b821cc
introduce fallback_local_when_failed in remote compaction runner.
LeslieKid Oct 1, 2024
b3ef9a5
make endpoint formatted.
LeslieKid Oct 1, 2024
f07dcd2
fix fmt.
LeslieKid Oct 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ license = "Apache-2.0"
[workspace]
resolver = "2"
# In alphabetical order
members = [
members = [
"horaectl",
"integration_tests",
"integration_tests/sdk/rust",
Expand All @@ -33,7 +33,7 @@ members = [
"src/catalog",
"src/catalog_impls",
"src/cluster",
"src/common_types",
"src/common_types",
"src/components/alloc_tracker",
"src/components/arena",
"src/components/arrow_ext",
Expand Down Expand Up @@ -101,7 +101,7 @@ thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "a5874d9fedee32ab1292252c4eb6defc4f6e245a" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "fac8564e6e3d50e51daa2af6eb905e747f3191b0" }
codec = { path = "src/components/codec" }
chrono = "0.4"
clap = { version = "4.5.1", features = ["derive"] }
Expand Down
6 changes: 6 additions & 0 deletions src/analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async-trait = { workspace = true }
atomic_enum = { workspace = true }
base64 = { workspace = true }
bytes_ext = { workspace = true }
cluster = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
datafusion = { workspace = true }
Expand All @@ -66,17 +67,20 @@ logger = { workspace = true }
lru = { workspace = true }
macros = { workspace = true }
message_queue = { workspace = true }
meta_client = { workspace = true }
metric_ext = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
remote_engine_client = { workspace = true }
reqwest = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
sampling_cache = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
size_ext = { workspace = true }
skiplist = { path = "../components/skiplist" }
smallvec = { workspace = true }
Expand All @@ -87,7 +91,9 @@ tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
trace_metric = { workspace = true }
url = "2.2"
wal = { workspace = true }
xorfilter-rs = { workspace = true }

Expand Down
91 changes: 84 additions & 7 deletions src/analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};

use common_types::COMPACTION_STRATEGY;
use generic_error::{BoxError, GenericError};
use macros::define_result;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu};
use time_ext::TimeUnit;
use tokio::sync::oneshot;

use crate::{
compaction::picker::{CommonCompactionPicker, CompactionPickerRef},
sst::file::{FileHandle, Level},
sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level},
table::data::TableDataRef,
};

Expand Down Expand Up @@ -72,8 +74,22 @@ pub enum Error {
},
#[snafu(display("Invalid compaction option value, err: {}", error))]
InvalidOption { error: String, backtrace: Backtrace },

#[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))]
EmptyFileMeta { backtrace: Backtrace },

#[snafu(display("Failed to convert file meta, err:{}", source))]
ConvertFileMeta { source: GenericError },

#[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))]
EmptyPurgeQueue { backtrace: Backtrace },

#[snafu(display("Failed to convert level, err:{}", source))]
ConvertLevel { source: GenericError },
}

define_result!(Error);

#[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)]
pub enum CompactionStrategy {
#[default]
Expand Down Expand Up @@ -145,7 +161,7 @@ impl CompactionStrategy {
pub(crate) fn parse_from(
value: &str,
options: &HashMap<String, String>,
) -> Result<CompactionStrategy, Error> {
) -> Result<CompactionStrategy> {
match value.trim().to_lowercase().as_str() {
DEFAULT_STRATEGY => Ok(CompactionStrategy::Default),
STC_STRATEGY => Ok(CompactionStrategy::SizeTiered(
Expand Down Expand Up @@ -182,7 +198,7 @@ impl CompactionStrategy {
}

impl SizeTieredCompactionOptions {
pub(crate) fn validate(&self) -> Result<(), Error> {
pub(crate) fn validate(&self) -> Result<()> {
ensure!(
self.bucket_high > self.bucket_low,
InvalidOption {
Expand Down Expand Up @@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions {

pub(crate) fn parse_from(
options: &HashMap<String, String>,
) -> Result<SizeTieredCompactionOptions, Error> {
) -> Result<SizeTieredCompactionOptions> {
let mut opts = SizeTieredCompactionOptions::default();
if let Some(v) = options.get(BUCKET_LOW_KEY) {
opts.bucket_low = v.parse().context(ParseFloat {
Expand Down Expand Up @@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions {
);
}

pub(crate) fn validate(&self) -> Result<(), Error> {
pub(crate) fn validate(&self) -> Result<()> {
if !Self::valid_timestamp_unit(self.timestamp_resolution) {
return InvalidOption {
error: format!(
Expand All @@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions {

pub(crate) fn parse_from(
options: &HashMap<String, String>,
) -> Result<TimeWindowCompactionOptions, Error> {
) -> Result<TimeWindowCompactionOptions> {
let mut opts = TimeWindowCompactionOptions {
size_tiered: SizeTieredCompactionOptions::parse_from(options)?,
..Default::default()
Expand Down Expand Up @@ -326,6 +342,67 @@ pub struct CompactionInputFiles {
pub output_level: Level,
}

impl TryFrom<horaedbproto::compaction_service::CompactionInputFiles> for CompactionInputFiles {
type Error = Error;

fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles) -> Result<Self> {
let level: Level = value.level.try_into().box_err().context(ConvertLevel)?;
let output_level: Level = value
.output_level
.try_into()
.box_err()
.context(ConvertLevel)?;

let mut files: Vec<FileHandle> = Vec::with_capacity(value.files.len());
for file in value.files {
let meta: FileMeta = file
.meta
.context(EmptyFileMeta)?
.try_into()
.box_err()
.context(ConvertFileMeta)?;

let purge_queue: FilePurgeQueue = file.purge_queue.context(EmptyPurgeQueue)?.into();

files.push({
let handle = FileHandle::new(meta, purge_queue);
handle.set_being_compacted(file.being_compacted);
handle
});
}

Ok(CompactionInputFiles {
level,
files,
output_level,
})
}
}

impl From<CompactionInputFiles> for horaedbproto::compaction_service::CompactionInputFiles {
fn from(value: CompactionInputFiles) -> Self {
let mut files = Vec::with_capacity(value.files.len());
for file in value.files {
let handle = horaedbproto::compaction_service::FileHandle {
meta: Some(file.meta().into()),
purge_queue: Some(horaedbproto::compaction_service::FilePurgeQueue {
space_id: file.space_id(),
table_id: file.table_id().into(),
}),
being_compacted: file.being_compacted(),
metrics: Some(horaedbproto::compaction_service::SstMetrics {}),
};
files.push(handle);
}

Self {
level: value.level.as_u32(),
files,
output_level: value.output_level.as_u32(),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct ExpiredFiles {
/// Level of the expired files.
Expand Down
1 change: 1 addition & 0 deletions src/analytic_engine/src/compaction/runner/local_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::{
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;

/// Executor carrying for actual compaction work
#[derive(Clone)]
pub struct LocalCompactionRunner {
runtime: Arc<Runtime>,
scan_options: ScanOptions,
Expand Down
Loading
Loading