Skip to content

Commit

Permalink
feat(mito): Write wal and memtable (#2135)
Browse files Browse the repository at this point in the history
* feat: hold wal entry in RegionWriteCtx

* feat: entry id and commited sequence

* feat: write to wal

* feat: write memtable

* feat: fill missing columns

* feat: validate write request

* feat: more validation to write request

* chore: fix typos

* feat: remove init and validate rows in new()

* style: fix clippy
  • Loading branch information
evenyag authored Aug 12, 2023
1 parent b62e643 commit e6090a8
Show file tree
Hide file tree
Showing 11 changed files with 679 additions and 92 deletions.
21 changes: 13 additions & 8 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ mod tests;
use std::sync::Arc;

use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
Expand Down Expand Up @@ -90,12 +90,17 @@ impl MitoEngine {
}

/// Write to a region.
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;

// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.
pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> {
let region = self
.inner
.workers
.get_region(write_request.region_id)
.context(RegionNotFoundSnafu {
region_id: write_request.region_id,
})?;
let metadata = region.metadata();

write_request.fill_missing_columns(&metadata)?;

self.inner
.handle_request_body(RequestBody::Write(write_request))
Expand Down
22 changes: 15 additions & 7 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
Expand Down Expand Up @@ -185,15 +186,10 @@ pub enum Error {

/// An error type to indicate that schema is changed and we need
/// to fill default values again.
#[snafu(display(
"Need to fill default value to column {} of region {}",
column,
region_id
))]
#[snafu(display("Need to fill default value for region {}", region_id))]
FillDefault {
region_id: RegionId,
column: String,
// The error is for retry purpose so we don't need a location.
// The error is for internal use so we don't need a location.
},

#[snafu(display(
Expand Down Expand Up @@ -260,10 +256,21 @@ pub enum Error {
location: Location,
source: BoxedError,
},

// Shared error for each writer in the write group.
#[snafu(display("Failed to write region, source: {}", source))]
WriteGroup { source: Arc<Error> },
}

pub type Result<T> = std::result::Result<T, Error>;

impl Error {
/// Returns true if we need to fill default value for a region.
pub(crate) fn is_fill_default(&self) -> bool {
matches!(self, Error::FillDefault { .. })
}
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
Expand Down Expand Up @@ -296,6 +303,7 @@ impl ErrorExt for Error {
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use crate::error::Result;
use crate::memtable::key_values::KeyValues;
pub use crate::memtable::key_values::KeyValues;
use crate::metadata::RegionMetadataRef;

/// Id for memtables.
Expand Down
9 changes: 2 additions & 7 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use greptime_proto::v1;
use greptime_proto::v1::{value, ColumnDataType, Value};
use greptime_proto::v1::ColumnDataType;
use store_api::storage::RegionId;

use super::*;
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
use crate::proto_util::i64_value;

const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;
Expand Down Expand Up @@ -290,12 +291,6 @@ mod tests {
}
}

fn i64_value(data: i64) -> Value {
Value {
value: Some(value::Value::I64Value(data)),
}
}

fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ impl MemtableVersion {
immutables: vec![],
}
}

/// Returns the mutable memtable.
pub(crate) fn mutable(&self) -> &MemtableRef {
&self.mutable
}
}
52 changes: 52 additions & 0 deletions src/mito2/src/proto_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,56 @@ pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
Some(proto_value)
}

/// Returns the [ColumnDataType] of the value.
///
/// If value is null, returns `None`.
pub(crate) fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
let value_data = value.value.as_ref()?;
let value_type = match value_data {
v1::value::Value::I8Value(_) => ColumnDataType::Int8,
v1::value::Value::I16Value(_) => ColumnDataType::Int16,
v1::value::Value::I32Value(_) => ColumnDataType::Int32,
v1::value::Value::I64Value(_) => ColumnDataType::Int64,
v1::value::Value::U8Value(_) => ColumnDataType::Uint8,
v1::value::Value::U16Value(_) => ColumnDataType::Uint16,
v1::value::Value::U32Value(_) => ColumnDataType::Uint32,
v1::value::Value::U64Value(_) => ColumnDataType::Uint64,
v1::value::Value::F32Value(_) => ColumnDataType::Float32,
v1::value::Value::F64Value(_) => ColumnDataType::Float64,
v1::value::Value::BoolValue(_) => ColumnDataType::Boolean,
v1::value::Value::BinaryValue(_) => ColumnDataType::Binary,
v1::value::Value::StringValue(_) => ColumnDataType::String,
v1::value::Value::DateValue(_) => ColumnDataType::Date,
v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime,
v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond,
v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond,
v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
};
Some(value_type)
}

// TODO(yingwen): Support conversion in greptime-proto.
/// Creates value for i64.
#[cfg(test)]
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value: Some(v1::value::Value::I64Value(data)),
}
}

/// Creates value for timestamp millis.
#[cfg(test)]
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {
value: Some(v1::value::Value::TsMillisecondValue(data)),
}
}

/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
Expand Down Expand Up @@ -186,3 +236,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType
false
}
}

// TODO(yingwen): Tests.
12 changes: 7 additions & 5 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use store_api::storage::RegionId;

use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::metadata::RegionMetadataRef;
use crate::region::version::VersionControlRef;

/// Type to store region version.
pub type VersionNumber = u32;
Expand All @@ -40,7 +41,7 @@ pub(crate) struct MitoRegion {
pub(crate) region_id: RegionId,

/// Version controller for this region.
version_control: VersionControlRef,
pub(crate) version_control: VersionControlRef,
/// Manager to maintain manifest for this region.
manifest_manager: RegionManifestManager,
}
Expand All @@ -57,9 +58,10 @@ impl MitoRegion {
Ok(())
}

/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
self.version_control.current()
/// Returns current metadata of the region.
pub(crate) fn metadata(&self) -> RegionMetadataRef {
let version_data = self.version_control.current();
version_data.version.metadata.clone()
}
}

Expand Down
35 changes: 26 additions & 9 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,56 @@
//! Reason: data may be flushed/compacted and some data with old sequence may be removed
//! and became invisible between step 1 and 2, so need to acquire version at first.

use std::sync::Arc;
use std::sync::{Arc, RwLock};

use arc_swap::ArcSwap;
use store_api::storage::SequenceNumber;

use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::MemtableRef;
use crate::metadata::RegionMetadataRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;

/// Controls version of in memory metadata for a region.
/// Controls metadata and sequence numbers for a region.
///
/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata
/// will generate a new [Version].
#[derive(Debug)]
pub(crate) struct VersionControl {
/// Latest version.
version: ArcSwap<Version>,
data: RwLock<VersionControlData>,
}

impl VersionControl {
/// Returns a new [VersionControl] with specific `version`.
pub(crate) fn new(version: Version) -> VersionControl {
VersionControl {
version: ArcSwap::new(Arc::new(version)),
data: RwLock::new(VersionControlData {
version: Arc::new(version),
committed_sequence: 0,
last_entry_id: 0,
}),
}
}

/// Returns current [Version].
pub(crate) fn current(&self) -> VersionRef {
self.version.load_full()
/// Returns current copy of data.
pub(crate) fn current(&self) -> VersionControlData {
self.data.read().unwrap().clone()
}
}

pub(crate) type VersionControlRef = Arc<VersionControl>;

/// Data of [VersionControl].
#[derive(Debug, Clone)]
pub(crate) struct VersionControlData {
/// Latest version.
pub(crate) version: VersionRef,
/// Sequence number of last committed data.
pub(crate) committed_sequence: SequenceNumber,
/// Last WAL entry Id.
pub(crate) last_entry_id: EntryId,
}

/// Static metadata of a region.
#[derive(Clone, Debug)]
pub(crate) struct Version {
Expand Down
Loading

0 comments on commit e6090a8

Please sign in to comment.