diff --git a/integration_tests/Makefile b/integration_tests/Makefile index e6ce21bdfa..efe7bfb466 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -21,6 +21,7 @@ HORAEDB_DATA_DIR = /tmp/horaedb HORAEDB_DATA_DIR_0 = /tmp/horaedb0 HORAEDB_DATA_DIR_1 = /tmp/horaedb1 HORAEMETA_DATA_DIR = /tmp/horaemeta +HORAEDB_DATA_DIR_2 = /tmp/horaedb2 export HORAEDB_TEST_CASE_PATH ?= $(ROOT)/cases/env export HORAEDB_TEST_BINARY ?= $(ROOT)/../target/$(MODE)/horaedb-test @@ -42,13 +43,17 @@ export CLUSTER_HORAEDB_STDOUT_FILE_0 ?= /tmp/horaedb-stdout-0.log export CLUSTER_HORAEDB_STDOUT_FILE_1 ?= /tmp/horaedb-stdout-1.log export RUST_BACKTRACE=1 +# Environment variables for compaction offload +export HORAEDB_STDOUT_FILE_2 ?= /tmp/horaedb-stdout-2.log +export HORAEDB_CONFIG_FILE_2 ?= $(ROOT)/config/compaction-offload.toml + # Whether update related repos # We don't want to rebuild the binaries and data on sometimes(e.g. debugging in local), # and we can set it to false. export UPDATE_REPOS_TO_LATEST ?= true clean: - rm -rf $(HORAEDB_DATA_DIR) $(HORAEDB_DATA_DIR_0) $(HORAEDB_DATA_DIR_1) $(HORAEMETA_DATA_DIR) + rm -rf $(HORAEDB_DATA_DIR) $(HORAEDB_DATA_DIR_0) $(HORAEDB_DATA_DIR_1) $(HORAEMETA_DATA_DIR) $(HORAEDB_DATA_DIR_2) build-meta: ./build_meta.sh @@ -89,6 +94,9 @@ run-local: prepare run-cluster: prepare build-meta HORAEDB_ENV_FILTER=cluster $(HORAEDB_TEST_BINARY) +run-compaction-offload: prepare + HORAEDB_ENV_FILTER=compaction_offload $(HORAEDB_TEST_BINARY) + run-java: java -version cd sdk/java && MAVEN_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn clean compile exec:java diff --git a/integration_tests/README.md b/integration_tests/README.md index 3c87cda7d4..a3dc758392 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -12,6 +12,9 @@ make run-local # Only cluster env make run-cluster + +# Only compaction offload env +make run-compaction-offload ``` `horaedb-test` will recursively find all the files end with `.sql` and run it. Each file will be treated as a case. A file can contain multiple SQLs. When finished it will tell how many cases it run, and display the diff set if there is any. An example with one case: diff --git a/integration_tests/cases/env/compaction_offload/compact/compact.result b/integration_tests/cases/env/compaction_offload/compact/compact.result new file mode 100644 index 0000000000..9f4d91b488 --- /dev/null +++ b/integration_tests/cases/env/compaction_offload/compact/compact.result @@ -0,0 +1,110 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- +DROP TABLE IF EXISTS `compact_table1`; + +affected_rows: 0 + +CREATE TABLE `compact_table1` ( + `timestamp` timestamp NOT NULL, + `value` double, + `dic` string dictionary, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode='OVERWRITE' +); + +affected_rows: 0 + +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (1, 100, "d1"), (2, 200, "d2"), (3, 300, "d3"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (1, 100, "update_d1"), (2, 200, "update_d2"), (3, 300, "update_d3"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (4, 400, "d4"), (5, 500, "d5"), (6, 600, "d6"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (4, 400, "update_d4"), (5, 500, "update_d5"), (6, 600, "update_d6"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (7, 700, "d7"), (8, 800, "d8"), (9, 900, "d9"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (7, 700, "update_d7"), (8, 800, "update_d8"), (9, 900, "update_d9"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (10, 1000, "d10"), (11, 1100, "d11"), (12, 1200, "d12"); + +affected_rows: 3 + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (10, 1000, "update_d10"), (11, 1100, "update_d11"), (12, 1200, "update_d12"); + +affected_rows: 3 + +-- trigger manual compaction after flush memtable +-- SQLNESS ARG pre_cmd=flush +-- SQLNESS ARG pre_cmd=compact +SELECT + * +FROM + `compact_table1` +ORDER BY + `value` ASC; + +tsid,timestamp,value,dic, +UInt64(0),Timestamp(1),Double(100.0),String("update_d1"), +UInt64(0),Timestamp(2),Double(200.0),String("update_d2"), +UInt64(0),Timestamp(3),Double(300.0),String("update_d3"), +UInt64(0),Timestamp(4),Double(400.0),String("update_d4"), +UInt64(0),Timestamp(5),Double(500.0),String("update_d5"), +UInt64(0),Timestamp(6),Double(600.0),String("update_d6"), +UInt64(0),Timestamp(7),Double(700.0),String("update_d7"), +UInt64(0),Timestamp(8),Double(800.0),String("update_d8"), +UInt64(0),Timestamp(9),Double(900.0),String("update_d9"), +UInt64(0),Timestamp(10),Double(1000.0),String("update_d10"), +UInt64(0),Timestamp(11),Double(1100.0),String("update_d11"), +UInt64(0),Timestamp(12),Double(1200.0),String("update_d12"), + + +DROP TABLE `compact_table1`; + +affected_rows: 0 + diff --git a/integration_tests/cases/env/compaction_offload/compact/compact.sql b/integration_tests/cases/env/compaction_offload/compact/compact.sql new file mode 100644 index 0000000000..f0aa46fbb5 --- /dev/null +++ b/integration_tests/cases/env/compaction_offload/compact/compact.sql @@ -0,0 +1,76 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +DROP TABLE IF EXISTS `compact_table1`; + +CREATE TABLE `compact_table1` ( + `timestamp` timestamp NOT NULL, + `value` double, + `dic` string dictionary, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode='OVERWRITE' +); + + +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (1, 100, "d1"), (2, 200, "d2"), (3, 300, "d3"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (1, 100, "update_d1"), (2, 200, "update_d2"), (3, 300, "update_d3"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (4, 400, "d4"), (5, 500, "d5"), (6, 600, "d6"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (4, 400, "update_d4"), (5, 500, "update_d5"), (6, 600, "update_d6"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (7, 700, "d7"), (8, 800, "d8"), (9, 900, "d9"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (7, 700, "update_d7"), (8, 800, "update_d8"), (9, 900, "update_d9"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (10, 1000, "d10"), (11, 1100, "d11"), (12, 1200, "d12"); + +-- SQLNESS ARG pre_cmd=flush +INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`) + VALUES (10, 1000, "update_d10"), (11, 1100, "update_d11"), (12, 1200, "update_d12"); + + +-- trigger manual compaction after flush memtable +-- SQLNESS ARG pre_cmd=flush +-- SQLNESS ARG pre_cmd=compact +SELECT + * +FROM + `compact_table1` +ORDER BY + `value` ASC; + + +DROP TABLE `compact_table1`; diff --git a/integration_tests/cases/env/compaction_offload/config.toml b/integration_tests/cases/env/compaction_offload/config.toml new file mode 100644 index 0000000000..044e6af1a5 --- /dev/null +++ b/integration_tests/cases/env/compaction_offload/config.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[server] +bind_addr = "127.0.0.1" +http_port = 5440 +grpc_port = 8831 + +[query_engine] +read_parallelism = 8 + +[analytic.wal] +type = "RocksDB" +data_dir = "/tmp/horaedb" + +[analytic.storage] +mem_cache_capacity = '1G' +mem_cache_partition_bits = 0 +disk_cache_dir = "/tmp/horaedb" +disk_cache_capacity = '2G' +disk_cache_page_size = '1M' + +[analytic.storage.object_store] +type = "Local" +data_dir = "/tmp/horaedb" + +[analytic.compaction_mode] +compaction_mode = "Offload" +node_picker = "Local" +endpoint = "127.0.0.1:8831" diff --git a/integration_tests/config/compaction-offload.toml b/integration_tests/config/compaction-offload.toml new file mode 100644 index 0000000000..71c913f87b --- /dev/null +++ b/integration_tests/config/compaction-offload.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[server] +bind_addr = "0.0.0.0" +http_port = 5440 +grpc_port = 8831 +postgresql_port = 5433 + +[logger] +level = "info" + +[tracing] +dir = "/tmp/compaction-offload" + +[analytic.storage.object_store] +type = "Local" +data_dir = "/tmp/compaction-offload" + +[analytic.wal] +type = "RocksDB" +data_dir = "/tmp/compaction-offload" + +[analytic.compaction_mode] +compaction_mode = "Offload" +node_picker = "Local" +endpoint = "127.0.0.1:8831" + +[analytic] +enable_primary_key_sampling = true diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index 2020cd84d7..e598a46ae7 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -43,6 +43,9 @@ const CLUSTER_HORAEDB_STDOUT_FILE_0_ENV: &str = "CLUSTER_HORAEDB_STDOUT_FILE_0"; const CLUSTER_HORAEDB_STDOUT_FILE_1_ENV: &str = "CLUSTER_HORAEDB_STDOUT_FILE_1"; const CLUSTER_HORAEDB_HEALTH_CHECK_INTERVAL_SECONDS: usize = 5; +const HORAEDB_STDOUT_FILE_2_ENV: &str = "HORAEDB_STDOUT_FILE_2"; +const HORAEDB_CONFIG_FILE_2_ENV: &str = "HORAEDB_CONFIG_FILE_2"; + const HORAEDB_SERVER_ADDR: &str = "HORAEDB_SERVER_ADDR"; // Used to access HoraeDB by http service. @@ -82,6 +85,10 @@ pub struct HoraeDBCluster { meta_stable_check_sql: String, } +pub struct HoraeDBCompactionOffload { + server: HoraeDBServer, +} + impl HoraeDBServer { fn spawn(bin: String, config: String, stdout: String) -> Self { let local_ip = local_ip_address::local_ip() @@ -231,6 +238,29 @@ impl Backend for HoraeDBCluster { } } +#[async_trait] +impl Backend for HoraeDBCompactionOffload { + fn start() -> Self { + let config = env::var(HORAEDB_CONFIG_FILE_2_ENV).expect("Cannot parse horaedb2 config env"); + let bin = env::var(HORAEDB_BINARY_PATH_ENV).expect("Cannot parse binary path env"); + let stdout = env::var(HORAEDB_STDOUT_FILE_2_ENV).expect("Cannot parse stdout2 env"); + Self { + server: HoraeDBServer::spawn(bin, config, stdout), + } + } + + async fn wait_for_ready(&self) { + tokio::time::sleep(Duration::from_secs(10)).await + } + + fn stop(&mut self) { + self.server + .server_process + .kill() + .expect("Failed to kill server"); + } +} + pub struct HoraeDB { backend: T, db_client: Arc, @@ -264,6 +294,7 @@ impl TryFrom<&str> for Protocol { #[derive(Debug, Clone, Copy)] enum Command { Flush, + Compact, } impl TryFrom<&str> for Command { @@ -272,6 +303,7 @@ impl TryFrom<&str> for Command { fn try_from(s: &str) -> Result { let cmd = match s { "flush" => Self::Flush, + "compact" => Self::Compact, _ => return Err(format!("Unknown command:{s}")), }; @@ -305,6 +337,12 @@ impl Database for HoraeDB { panic!("Execute flush command failed, err:{e}"); } } + Command::Compact => { + println!("Compact table..."); + if let Err(e) = self.execute_compact().await { + panic!("Execute compact command failed, err:{e}"); + } + } } } @@ -363,6 +401,19 @@ impl HoraeDB { Err(resp.text().await.unwrap_or_else(|e| format!("{e:?}"))) } + async fn execute_compact(&self) -> Result<(), String> { + // TODO(leslie): Improve code reusability. The following code is similar to + // `execute_flush()`. + let url = format!("http://{}/debug/compact_table", self.http_client.endpoint); + let resp = self.http_client.client.post(url).send().await.unwrap(); + + if resp.status() == StatusCode::OK { + return Ok(()); + } + + Err(resp.text().await.unwrap_or_else(|e| format!("{e:?}"))) + } + async fn execute_influxql( query: String, http_client: HttpClient, diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 7098712715..e2c63f10c6 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -21,7 +21,7 @@ use std::{env, fmt::Display, path::Path}; use anyhow::Result; use async_trait::async_trait; -use database::{Backend, HoraeDB}; +use database::{Backend, HoraeDB, HoraeDBCompactionOffload}; use sqlness::{Database, EnvController, QueryContext, Runner}; use crate::database::{HoraeDBCluster, HoraeDBServer}; @@ -65,6 +65,9 @@ impl EnvController for HoraeDBController { let db = match env { "local" => Box::new(HoraeDB::::create().await) as DbRef, "cluster" => Box::new(HoraeDB::::create().await) as DbRef, + "compaction_offload" => { + Box::new(HoraeDB::::create().await) as DbRef + } _ => panic!("invalid env {env}"), }; @@ -103,6 +106,10 @@ async fn main() -> Result<()> { "build_local" => { let _ = controller.start("local", None).await; } + // Just build the compaction offload testing env. + "build_compaction_offload" => { + let _ = controller.start("compaction_offload", None).await; + } other => { panic!("Unknown run mode:{other}") } diff --git a/src/server/src/http.rs b/src/server/src/http.rs index 83dad8785d..d31f5adec0 100644 --- a/src/server/src/http.rs +++ b/src/server/src/http.rs @@ -247,6 +247,7 @@ impl Service { .or(self.admin_block()) // debug APIs .or(self.flush_memtable()) + .or(self.compact_table()) .or(self.update_log_level()) .or(self.profile_cpu()) .or(self.profile_heap()) @@ -524,6 +525,54 @@ impl Service { }) } + // POST /debug/compact_table + fn compact_table( + &self, + ) -> impl Filter + Clone { + warp::path!("debug" / "compact_table") + .and(warp::post()) + .and(self.with_instance()) + .and_then(|instance: InstanceRef| async move { + let get_all_tables = || { + let mut tables = Vec::new(); + for catalog in instance + .catalog_manager + .all_catalogs() + .box_err() + .context(Internal)? + { + for schema in catalog.all_schemas().box_err().context(Internal)? { + for table in schema.all_tables().box_err().context(Internal)? { + tables.push(table); + } + } + } + Result::Ok(tables) + }; + match get_all_tables() { + Ok(tables) => { + let mut failed_tables = Vec::new(); + let mut success_tables = Vec::new(); + + for table in tables { + let table_name = table.name().to_string(); + if let Err(e) = table.compact().await { + error!("compact {} failed, err:{}", &table_name, e); + failed_tables.push(table_name); + } else { + success_tables.push(table_name); + } + } + let mut result = HashMap::new(); + result.insert("success", success_tables); + result.insert("failed", failed_tables); + Ok(reply::json(&result)) + } + Err(e) => Err(reject::custom(e)), + } + }) + } + // GET /metrics fn metrics( &self, diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml index 30a5b00461..f691746d88 100644 --- a/src/wal/Cargo.toml +++ b/src/wal/Cargo.toml @@ -27,15 +27,15 @@ workspace = true [package.authors] workspace = true -[package.edition] -workspace = true - [dependencies.rocksdb] git = "https://github.com/tikv/rust-rocksdb.git" rev = "f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f" features = ["portable"] optional = true +[package.edition] +workspace = true + [features] wal-message-queue = ["dep:message_queue"] wal-table-kv = ["dep:table_kv"]