Skip to content

Commit

Permalink
fix CR
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Sep 11, 2024
1 parent f5a0a67 commit 9ae567f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 28 deletions.
5 changes: 1 addition & 4 deletions horaedb/metric_engine/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use crate::types::ObjectStoreRef;

pub struct SSTable {
pub sst_id: u64,
pub storage: ObjectStoreRef,
pub id: u64,
}
39 changes: 18 additions & 21 deletions horaedb/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,31 @@ use crate::{
Result,
};

pub struct CompactContext {}
pub struct WriteContext {}
pub struct ScanContext {}
pub struct WriteRequest {
batch: RecordBatch,
}

pub struct ScanRequest {
range: TimeRange,
predicates: Vec<Predicate>,
/// `None` means all columns.
projections: Option<Vec<usize>>,
}

pub struct CompactRequest {}

/// Time-aware merge storage interface.
#[async_trait]
pub trait TimeMergeStorage {
fn schema(&self) -> Result<&Schema>;

async fn write(&self, ctx: &WriteContext, batch: RecordBatch) -> Result<()>;
async fn write(&self, req: WriteRequest) -> Result<()>;

/// Implementation shoule ensure that the returned stream is sorted by time,
/// from old to latest.
async fn scan(
&self,
ctx: &ScanContext,
range: TimeRange,
predicates: Vec<Predicate>,
projections: Vec<usize>,
) -> Result<SendableRecordBatchStream>;
async fn scan(&self, req: ScanRequest) -> Result<SendableRecordBatchStream>;

async fn compact(&self, ctx: &CompactContext) -> Result<()>;
async fn compact(&self, req: CompactRequest) -> Result<()>;
}

/// TMStorage implementation using cloud object storage.
Expand Down Expand Up @@ -76,21 +79,15 @@ impl TimeMergeStorage for CloudObjectStorage {
todo!()
}

async fn write(&self, ctx: &WriteContext, batch: RecordBatch) -> Result<()> {
async fn write(&self, req: WriteRequest) -> Result<()> {
todo!()
}

async fn scan(
&self,
ctx: &ScanContext,
range: TimeRange,
predicates: Vec<Predicate>,
projections: Vec<usize>,
) -> Result<SendableRecordBatchStream> {
async fn scan(&self, req: ScanRequest) -> Result<SendableRecordBatchStream> {
todo!()
}

async fn compact(&self, ctx: &CompactContext) -> Result<()> {
async fn compact(&self, req: CompactRequest) -> Result<()> {
todo!()
}
}
5 changes: 2 additions & 3 deletions horaedb/metric_engine/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@ pub enum Value {
pub enum Predicate {
Equal(String, Value),
NotEqual(String, Value),
RegexMatch(String, Value),
NotRegexMatch(String, Value),
RegexMatch(String, Vec<u8>),
NotRegexMatch(String, Vec<u8>),
}

pub type TimeRange = Range<i64>;

pub type ObjectStoreRef = Arc<dyn ObjectStore>;

/// Trait for types that stream [arrow::record_batch::RecordBatch]
// TODO: how to attach TSID hint to RecordBatch?
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> &Schema;
}
Expand Down

0 comments on commit 9ae567f

Please sign in to comment.