From e6090a8d5b224e5c2640bf22866eb6ac30a797f5 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 12 Aug 2023 16:44:44 +0900 Subject: [PATCH] feat(mito): Write wal and memtable (#2135) * feat: hold wal entry in RegionWriteCtx * feat: entry id and commited sequence * feat: write to wal * feat: write memtable * feat: fill missing columns * feat: validate write request * feat: more validation to write request * chore: fix typos * feat: remove init and validate rows in new() * style: fix clippy --- src/mito2/src/engine.rs | 21 +- src/mito2/src/error.rs | 22 +- src/mito2/src/memtable.rs | 2 +- src/mito2/src/memtable/key_values.rs | 9 +- src/mito2/src/memtable/version.rs | 5 + src/mito2/src/proto_util.rs | 52 ++++ src/mito2/src/region.rs | 12 +- src/mito2/src/region/version.rs | 35 ++- src/mito2/src/request.rs | 413 ++++++++++++++++++++++++--- src/mito2/src/worker.rs | 18 +- src/mito2/src/worker/handle_write.rs | 182 ++++++++++-- 11 files changed, 679 insertions(+), 92 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index f772795afa53..30e6deefd358 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -20,12 +20,12 @@ mod tests; use std::sync::Arc; use object_store::ObjectStore; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::{RecvSnafu, Result}; +use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; use crate::request::{ CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest, }; @@ -90,12 +90,17 @@ impl MitoEngine { } /// Write to a region. - pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { - write_request.validate()?; - - // TODO(yingwen): Fill default values. - // We need to fill default values before writing it to WAL so we can get - // the same default value after reopening the region. + pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> { + let region = self + .inner + .workers + .get_region(write_request.region_id) + .context(RegionNotFoundSnafu { + region_id: write_request.region_id, + })?; + let metadata = region.metadata(); + + write_request.fill_missing_columns(&metadata)?; self.inner .handle_request_body(RequestBody::Write(write_request)) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e5778a9c2bfa..df8aa441a117 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use common_datasource::compression::CompressionType; use common_error::ext::{BoxedError, ErrorExt}; @@ -185,15 +186,10 @@ pub enum Error { /// An error type to indicate that schema is changed and we need /// to fill default values again. - #[snafu(display( - "Need to fill default value to column {} of region {}", - column, - region_id - ))] + #[snafu(display("Need to fill default value for region {}", region_id))] FillDefault { region_id: RegionId, - column: String, - // The error is for retry purpose so we don't need a location. + // The error is for internal use so we don't need a location. }, #[snafu(display( @@ -260,10 +256,21 @@ pub enum Error { location: Location, source: BoxedError, }, + + // Shared error for each writer in the write group. + #[snafu(display("Failed to write region, source: {}", source))] + WriteGroup { source: Arc }, } pub type Result = std::result::Result; +impl Error { + /// Returns true if we need to fill default value for a region. + pub(crate) fn is_fill_default(&self) -> bool { + matches!(self, Error::FillDefault { .. }) + } +} + impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; @@ -296,6 +303,7 @@ impl ErrorExt for Error { | EncodeWal { .. } | DecodeWal { .. } => StatusCode::Internal, WriteBuffer { source, .. } => source.status_code(), + WriteGroup { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index e23a8daa8cd3..8b830a233b98 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use crate::error::Result; -use crate::memtable::key_values::KeyValues; +pub use crate::memtable::key_values::KeyValues; use crate::metadata::RegionMetadataRef; /// Id for memtables. diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 4ea70c3eae76..dbd8e12b9157 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -190,11 +190,12 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use greptime_proto::v1; - use greptime_proto::v1::{value, ColumnDataType, Value}; + use greptime_proto::v1::ColumnDataType; use store_api::storage::RegionId; use super::*; use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use crate::proto_util::i64_value; const TS_NAME: &str = "ts"; const START_SEQ: SequenceNumber = 100; @@ -290,12 +291,6 @@ mod tests { } } - fn i64_value(data: i64) -> Value { - Value { - value: Some(value::Value::I64Value(data)), - } - } - fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) { assert_eq!(num_rows, kvs.num_rows()); let mut expect_seq = START_SEQ; diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 0fdc6d07c674..f769da498b53 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -37,4 +37,9 @@ impl MemtableVersion { immutables: vec![], } } + + /// Returns the mutable memtable. + pub(crate) fn mutable(&self) -> &MemtableRef { + &self.mutable + } } diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs index 6884dff3604d..d2de210c8330 100644 --- a/src/mito2/src/proto_util.rs +++ b/src/mito2/src/proto_util.rs @@ -120,6 +120,56 @@ pub(crate) fn to_proto_value(value: Value) -> Option { Some(proto_value) } +/// Returns the [ColumnDataType] of the value. +/// +/// If value is null, returns `None`. +pub(crate) fn proto_value_type(value: &v1::Value) -> Option { + let value_data = value.value.as_ref()?; + let value_type = match value_data { + v1::value::Value::I8Value(_) => ColumnDataType::Int8, + v1::value::Value::I16Value(_) => ColumnDataType::Int16, + v1::value::Value::I32Value(_) => ColumnDataType::Int32, + v1::value::Value::I64Value(_) => ColumnDataType::Int64, + v1::value::Value::U8Value(_) => ColumnDataType::Uint8, + v1::value::Value::U16Value(_) => ColumnDataType::Uint16, + v1::value::Value::U32Value(_) => ColumnDataType::Uint32, + v1::value::Value::U64Value(_) => ColumnDataType::Uint64, + v1::value::Value::F32Value(_) => ColumnDataType::Float32, + v1::value::Value::F64Value(_) => ColumnDataType::Float64, + v1::value::Value::BoolValue(_) => ColumnDataType::Boolean, + v1::value::Value::BinaryValue(_) => ColumnDataType::Binary, + v1::value::Value::StringValue(_) => ColumnDataType::String, + v1::value::Value::DateValue(_) => ColumnDataType::Date, + v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime, + v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond, + v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond, + v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, + v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, + v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + }; + Some(value_type) +} + +// TODO(yingwen): Support conversion in greptime-proto. +/// Creates value for i64. +#[cfg(test)] +pub(crate) fn i64_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::I64Value(data)), + } +} + +/// Creates value for timestamp millis. +#[cfg(test)] +pub(crate) fn ts_ms_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::TsMillisecondValue(data)), + } +} + /// Convert [ConcreteDataType] to [ColumnDataType]. pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { let column_data_type = match data_type { @@ -186,3 +236,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType false } } + +// TODO(yingwen): Tests. diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 2be5ddf2478c..b57bcbfd1d1d 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -25,7 +25,8 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; -use crate::region::version::{VersionControlRef, VersionRef}; +use crate::metadata::RegionMetadataRef; +use crate::region::version::VersionControlRef; /// Type to store region version. pub type VersionNumber = u32; @@ -40,7 +41,7 @@ pub(crate) struct MitoRegion { pub(crate) region_id: RegionId, /// Version controller for this region. - version_control: VersionControlRef, + pub(crate) version_control: VersionControlRef, /// Manager to maintain manifest for this region. manifest_manager: RegionManifestManager, } @@ -57,9 +58,10 @@ impl MitoRegion { Ok(()) } - /// Returns current version of the region. - pub(crate) fn version(&self) -> VersionRef { - self.version_control.current() + /// Returns current metadata of the region. + pub(crate) fn metadata(&self) -> RegionMetadataRef { + let version_data = self.version_control.current(); + version_data.version.metadata.clone() } } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 54fe29df3ca8..7e1f61f7476e 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -23,39 +23,56 @@ //! Reason: data may be flushed/compacted and some data with old sequence may be removed //! and became invisible between step 1 and 2, so need to acquire version at first. -use std::sync::Arc; +use std::sync::{Arc, RwLock}; -use arc_swap::ArcSwap; use store_api::storage::SequenceNumber; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; use crate::memtable::MemtableRef; use crate::metadata::RegionMetadataRef; use crate::sst::version::{SstVersion, SstVersionRef}; +use crate::wal::EntryId; -/// Controls version of in memory metadata for a region. +/// Controls metadata and sequence numbers for a region. +/// +/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata +/// will generate a new [Version]. #[derive(Debug)] pub(crate) struct VersionControl { - /// Latest version. - version: ArcSwap, + data: RwLock, } impl VersionControl { /// Returns a new [VersionControl] with specific `version`. pub(crate) fn new(version: Version) -> VersionControl { VersionControl { - version: ArcSwap::new(Arc::new(version)), + data: RwLock::new(VersionControlData { + version: Arc::new(version), + committed_sequence: 0, + last_entry_id: 0, + }), } } - /// Returns current [Version]. - pub(crate) fn current(&self) -> VersionRef { - self.version.load_full() + /// Returns current copy of data. + pub(crate) fn current(&self) -> VersionControlData { + self.data.read().unwrap().clone() } } pub(crate) type VersionControlRef = Arc; +/// Data of [VersionControl]. +#[derive(Debug, Clone)] +pub(crate) struct VersionControlData { + /// Latest version. + pub(crate) version: VersionRef, + /// Sequence number of last committed data. + pub(crate) committed_sequence: SequenceNumber, + /// Last WAL entry Id. + pub(crate) last_entry_id: EntryId, +} + /// Static metadata of a region. #[derive(Clone, Debug)] pub(crate) struct Version { diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 7a303027909e..4e5b3fa7fb04 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; -use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows}; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -27,8 +27,8 @@ use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; use crate::metadata::{ColumnMetadata, RegionMetadata}; use crate::proto_util::{ - is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type, - to_proto_value, + is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, + to_proto_semantic_type, to_proto_value, }; /// Options that affect the entire region. @@ -100,32 +100,58 @@ pub struct WriteRequest { pub rows: Rows, /// Map column name to column index in `rows`. name_to_index: HashMap, + /// Whether each column has null. + has_null: Vec, } impl WriteRequest { - /// Returns a new request. - pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest { - let name_to_index = rows - .schema - .iter() - .enumerate() - .map(|(index, column)| (column.column_name.clone(), index)) - .collect(); - WriteRequest { + /// Creates a new request. + /// + /// Returns `Err` if `rows` are invalid. + pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result { + let mut name_to_index = HashMap::with_capacity(rows.schema.len()); + for (index, column) in rows.schema.iter().enumerate() { + ensure!( + name_to_index + .insert(column.column_name.clone(), index) + .is_none(), + InvalidRequestSnafu { + region_id, + reason: format!("duplicate column {}", column.column_name), + } + ); + } + + let mut has_null = vec![false; rows.schema.len()]; + for row in &rows.rows { + ensure!( + row.values.len() == rows.schema.len(), + InvalidRequestSnafu { + region_id, + reason: format!( + "row has {} columns but schema has {}", + row.values.len(), + rows.schema.len() + ), + } + ); + + for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() { + validate_proto_value(region_id, value, column_schema)?; + + if value.value.is_none() { + has_null[i] = true; + } + } + } + + Ok(WriteRequest { region_id, op_type, rows, name_to_index, - } - } - - /// Validate the request. - pub(crate) fn validate(&self) -> Result<()> { - // - checks whether the request is too large. - // - checks whether each row in rows has the same schema. - // - checks whether each column match the schema in Rows. - // - checks rows don't have duplicate columns. - unimplemented!() + has_null, + }) } /// Get column index by name. @@ -133,7 +159,7 @@ impl WriteRequest { self.name_to_index.get(name).copied() } - /// Checks schema of rows. + /// Checks schema of rows is compatible with schema of the region. /// /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault) /// error. @@ -156,10 +182,12 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "column {} expect type {:?}, given: {:?}({})", + "column {} expect type {:?}, given: {}({})", column.column_schema.name, column.column_schema.data_type, - ColumnDataType::from_i32(input_col.datatype), + ColumnDataType::from_i32(input_col.datatype) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), input_col.datatype, ) } @@ -171,14 +199,27 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "column {} has semantic type {:?}, given: {:?}({})", + "column {} has semantic type {:?}, given: {}({})", column.column_schema.name, column.semantic_type, - greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), + greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), input_col.semantic_type ), } ); + + // Check nullable. + // Safety: `rows_columns` ensures this column exists. + let has_null = self.has_null[self.name_to_index[&column.column_schema.name]]; + ensure!( + !has_null || column.column_schema.is_nullable(), + InvalidRequestSnafu { + region_id, + reason: format!("column {} is not null", column.column_schema.name), + } + ); } else { // For columns not in rows, checks whether they have default value. ensure!( @@ -190,11 +231,7 @@ impl WriteRequest { } ); - return FillDefaultSnafu { - region_id, - column: &column.column_schema.name, - } - .fail(); + return FillDefaultSnafu { region_id }.fail(); } } @@ -278,6 +315,30 @@ impl WriteRequest { } } +/// Validate proto value schema. +pub(crate) fn validate_proto_value( + region_id: RegionId, + value: &Value, + column_schema: &ColumnSchema, +) -> Result<()> { + if let Some(value_type) = proto_value_type(value) { + ensure!( + value_type as i32 == column_schema.datatype, + InvalidRequestSnafu { + region_id, + reason: format!( + "column {} has type {:?}, but schema has type {:?}", + column_schema.column_name, + value_type, + ColumnDataType::from_i32(column_schema.datatype) + ), + } + ); + } + + Ok(()) +} + /// Sender and write request. pub(crate) struct SenderWriteRequest { /// Result sender. @@ -362,3 +423,291 @@ impl RequestBody { } } } + +#[cfg(test)] +mod tests { + use datatypes::prelude::ConcreteDataType; + use greptime_proto::v1::{Row, SemanticType}; + + use super::*; + use crate::error::Error; + use crate::metadata::RegionMetadataBuilder; + use crate::proto_util::{i64_value, ts_ms_value}; + + fn new_column_schema( + name: &str, + data_type: ColumnDataType, + semantic_type: SemanticType, + ) -> ColumnSchema { + ColumnSchema { + column_name: name.to_string(), + datatype: data_type as i32, + semantic_type: semantic_type as i32, + } + } + + fn check_invalid_request(err: &Error, expect: &str) { + if let Error::InvalidRequest { + region_id: _, + reason, + location: _, + } = err + { + assert_eq!(reason, expect); + } else { + panic!("Unexpected error {err}") + } + } + + #[test] + fn test_write_request_duplicate_column() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![], + }; + + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + check_invalid_request(&err, "duplicate column c0"); + } + + #[test] + fn test_valid_write_request() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + assert_eq!(0, request.column_index_by_name("c0").unwrap()); + assert_eq!(1, request.column_index_by_name("c1").unwrap()); + assert_eq!(None, request.column_index_by_name("c2")); + } + + #[test] + fn test_write_request_column_num() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2), i64_value(3)], + }], + }; + + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + check_invalid_request(&err, "row has 3 columns but schema has 2"); + } + + fn new_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1), 1); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: crate::metadata::SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "k0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: crate::metadata::SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() + } + + #[test] + fn test_check_schema() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + request.check_schema(&metadata).unwrap(); + } + + #[test] + fn test_column_type() { + let rows = Rows { + schema: vec![ + new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)"); + } + + #[test] + fn test_semantic_type() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Tag, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)"); + } + + #[test] + fn test_column_nullable() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![Value { value: None }, i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts is not null"); + } + + #[test] + fn test_column_default() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "missing column ts"); + } + + #[test] + fn test_unknown_column() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2), i64_value(3)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, r#"unknown columns: ["k1"]"#); + } + + #[test] + fn test_fill_missing_columns() { + let rows = Rows { + schema: vec![new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )], + rows: vec![Row { + values: vec![ts_ms_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + assert!(err.is_fill_default()); + request.fill_missing_columns(&metadata).unwrap(); + + let expect_rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), Value { value: None }], + }], + }; + assert_eq!(expect_rows, request.rows); + } + + #[test] + fn test_no_default() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.fill_missing_columns(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts does not have default value"); + } +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2ba836666006..a26d657a1652 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -37,7 +37,7 @@ use tokio::sync::{mpsc, Mutex}; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; -use crate::region::{RegionMap, RegionMapRef}; +use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest}; use crate::wal::Wal; @@ -133,6 +133,13 @@ impl WorkerGroup { self.worker(region_id).is_region_exists(region_id) } + /// Returns region of specific `region_id`. + /// + /// This method should not be public. + pub(crate) fn get_region(&self, region_id: RegionId) -> Option { + self.worker(region_id).get_region(region_id) + } + /// Get worker for specific `region_id`. fn worker(&self, region_id: RegionId) -> &RegionWorker { let mut hasher = DefaultHasher::new(); @@ -252,6 +259,11 @@ impl RegionWorker { fn is_region_exists(&self, region_id: RegionId) -> bool { self.regions.is_region_exists(region_id) } + + /// Returns region of specific `region_id`. + fn get_region(&self, region_id: RegionId) -> Option { + self.regions.get_region(region_id) + } } impl Drop for RegionWorker { @@ -285,7 +297,7 @@ struct RegionWorkerLoop { memtable_builder: MemtableBuilderRef, } -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Starts the worker loop. async fn run(&mut self) { info!("Start region worker thread {}", self.id); @@ -353,7 +365,9 @@ impl RegionWorkerLoop { self.handle_ddl_requests(ddl_requests).await; } +} +impl RegionWorkerLoop { /// Takes and handles all ddl requests. async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { if ddl_requests.is_empty() { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index cc6b599ef6cf..8ddad90c11c6 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -15,26 +15,64 @@ //! Handling write requests. use std::collections::{hash_map, HashMap}; +use std::mem; +use std::sync::Arc; -use greptime_proto::v1::mito::Mutation; +use greptime_proto::v1::mito::{Mutation, WalEntry}; +use snafu::ResultExt; +use store_api::logstore::LogStore; +use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::Sender; -use crate::error::{RegionNotFoundSnafu, Result}; +use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu}; +use crate::memtable::KeyValues; +use crate::metadata::RegionMetadata; use crate::proto_util::to_proto_op_type; -use crate::region::version::VersionRef; +use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; -use crate::request::SenderWriteRequest; +use crate::request::{SenderWriteRequest, WriteRequest}; +use crate::wal::{EntryId, WalWriter}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Takes and handles all write requests. pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { if write_requests.is_empty() { return; } + let mut region_ctxs = self.prepare_region_write_ctx(write_requests); + + // Write to WAL. + let mut wal_writer = self.wal.writer(); + for region_ctx in region_ctxs.values_mut() { + if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) { + region_ctx.set_error(e); + } + } + if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; + } + + // Write to memtables. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.write_memtable(); + } + } +} + +impl RegionWorkerLoop { + /// Validates and groups requests by region. + fn prepare_region_write_ctx( + &self, + write_requests: Vec, + ) -> HashMap { let mut region_ctxs = HashMap::new(); - for sender_req in write_requests { + for mut sender_req in write_requests { let region_id = sender_req.request.region_id; // Checks whether the region exists. if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { @@ -53,9 +91,8 @@ impl RegionWorkerLoop { let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); // Checks whether request schema is compatible with region schema. - if let Err(e) = sender_req - .request - .check_schema(®ion_ctx.version.metadata) + if let Err(e) = + maybe_fill_missing_columns(&mut sender_req.request, ®ion_ctx.version.metadata) { send_result(sender_req.sender, Err(e)); @@ -66,8 +103,24 @@ impl RegionWorkerLoop { region_ctx.push_sender_request(sender_req); } - todo!() + region_ctxs + } +} + +/// Checks the schema and fill missing columns. +fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetadata) -> Result<()> { + if let Err(e) = request.check_schema(metadata) { + if e.is_fill_default() { + // TODO(yingwen): Add metrics for this case. + // We need to fill default value again. The write request may be a request + // sent before changing the schema. + request.fill_missing_columns(metadata)?; + } else { + return Err(e); + } } + + Ok(()) } /// Send result to the request. @@ -78,39 +131,126 @@ fn send_result(sender: Option>>, res: Result<()>) { } } +/// Notifier to notify write result on drop. +struct WriteNotify { + /// Error to send to the waiter. + err: Option>, + /// Sender to send write result to the waiter for this mutation. + sender: Option>>, +} + +impl WriteNotify { + /// Creates a new notify from the `sender`. + fn new(sender: Option>>) -> WriteNotify { + WriteNotify { err: None, sender } + } + + /// Send result to the waiter. + fn notify_result(&mut self) { + let Some(sender) = self.sender.take() else { + return; + }; + if let Some(err) = &self.err { + // Try to send the error to waiters. + let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu)); + } else { + // Send success result. + let _ = sender.send(Ok(())); + } + } +} + +impl Drop for WriteNotify { + fn drop(&mut self) { + self.notify_result(); + } +} + /// Context to keep region metadata and buffer write requests. struct RegionWriteCtx { /// Region to write. region: MitoRegionRef, /// Version of the region while creating the context. version: VersionRef, - /// Valid mutations. - mutations: Vec, - /// Result senders. + /// Next sequence number to write. + /// + /// The context assigns a unique sequence number for each row. + next_sequence: SequenceNumber, + /// Next entry id of WAL to write. + next_entry_id: EntryId, + /// Valid WAL entry to write. + /// + /// We keep [WalEntry] instead of mutations to avoid taking mutations + /// out of the context to construct the wal entry when we write to the wal. + wal_entry: WalEntry, + /// Notifiers to send write results to waiters. /// - /// The sender is 1:1 map to the mutation in `mutations`. - senders: Vec>>>, + /// The i-th notify is for i-th mutation. + notifiers: Vec, } impl RegionWriteCtx { /// Returns an empty context. fn new(region: MitoRegionRef) -> RegionWriteCtx { - let version = region.version(); + let VersionControlData { + version, + committed_sequence, + last_entry_id, + } = region.version_control.current(); RegionWriteCtx { region, version, - mutations: Vec::new(), - senders: Vec::new(), + next_sequence: committed_sequence + 1, + next_entry_id: last_entry_id + 1, + wal_entry: WalEntry::default(), + notifiers: Vec::new(), } } /// Push [SenderWriteRequest] to the context. fn push_sender_request(&mut self, sender_req: SenderWriteRequest) { - self.mutations.push(Mutation { + let num_rows = sender_req.request.rows.rows.len() as u64; + + self.wal_entry.mutations.push(Mutation { op_type: to_proto_op_type(sender_req.request.op_type) as i32, - sequence: 0, // TODO(yingwen): Set sequence. + sequence: self.next_sequence, rows: Some(sender_req.request.rows), }); - self.senders.push(sender_req.sender); + // Notifiers are 1:1 map to mutations. + self.notifiers.push(WriteNotify::new(sender_req.sender)); + + // Increase sequence number. + self.next_sequence += num_rows; + } + + /// Encode and add WAL entry to the writer. + fn add_wal_entry(&self, wal_writer: &mut WalWriter) -> Result<()> { + wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry) + } + + /// Sets error and marks all write operations are failed. + fn set_error(&mut self, err: Arc) { + // Set error for all notifiers + for notify in &mut self.notifiers { + notify.err = Some(err.clone()); + } + } + + /// Consumes mutations and writes them into mutable memtable. + fn write_memtable(&mut self) { + debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); + + let mutable = self.version.memtables.mutable(); + // Takes mutations from the wal entry. + let mutations = mem::take(&mut self.wal_entry.mutations); + for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { + // Write mutation to the memtable. + let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { + continue; + }; + if let Err(e) = mutable.write(&kvs) { + notify.err = Some(Arc::new(e)); + } + } } }