From 7c72932ac7f7547b0189cb7cdc72ca39ce94426d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 31 May 2024 12:38:45 +0200 Subject: [PATCH] Fix the sys_inbox table --- crates/partition-store/src/inbox_table/mod.rs | 26 +++--- crates/storage-api/src/inbox_table/mod.rs | 6 -- .../storage-query-datafusion/src/inbox/mod.rs | 3 + .../storage-query-datafusion/src/inbox/row.rs | 5 +- .../src/inbox/table.rs | 5 +- .../src/inbox/tests.rs | 89 +++++++++++++++++++ 6 files changed, 110 insertions(+), 24 deletions(-) create mode 100644 crates/storage-query-datafusion/src/inbox/tests.rs diff --git a/crates/partition-store/src/inbox_table/mod.rs b/crates/partition-store/src/inbox_table/mod.rs index f76cf277c..91be0cfd3 100644 --- a/crates/partition-store/src/inbox_table/mod.rs +++ b/crates/partition-store/src/inbox_table/mod.rs @@ -10,7 +10,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::TableKind::Inbox; -use crate::{RocksDBTransaction, StorageAccess}; +use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use bytestring::ByteString; use futures::Stream; @@ -113,8 +113,20 @@ impl<'a> InboxTable for RocksDBTransaction<'a> { }, )) } +} + +fn decode_inbox_key_value(k: &[u8], mut v: &[u8]) -> Result { + let key = InboxKey::deserialize_from(&mut Cursor::new(k))?; + let sequence_number = *key.sequence_number_ok_or()?; + + let inbox_entry = StorageCodec::decode::(&mut v) + .map_err(|error| StorageError::Generic(error.into()))?; + + Ok(SequenceNumberInboxEntry::new(sequence_number, inbox_entry)) +} - fn all_inboxes( +impl PartitionStore { + pub fn all_inboxes( &mut self, range: RangeInclusive, ) -> impl Stream> + Send { @@ -128,16 +140,6 @@ impl<'a> InboxTable for RocksDBTransaction<'a> { } } -fn decode_inbox_key_value(k: &[u8], mut v: &[u8]) -> Result { - let key = InboxKey::deserialize_from(&mut Cursor::new(k))?; - let sequence_number = *key.sequence_number_ok_or()?; - - let inbox_entry = StorageCodec::decode::(&mut v) - .map_err(|error| StorageError::Generic(error.into()))?; - - Ok(SequenceNumberInboxEntry::new(sequence_number, inbox_entry)) -} - #[cfg(test)] mod tests { use crate::inbox_table::InboxKey; diff --git a/crates/storage-api/src/inbox_table/mod.rs b/crates/storage-api/src/inbox_table/mod.rs index 0d2d9f414..f7ea906f5 100644 --- a/crates/storage-api/src/inbox_table/mod.rs +++ b/crates/storage-api/src/inbox_table/mod.rs @@ -14,7 +14,6 @@ use restate_types::identifiers::{InvocationId, PartitionKey, ServiceId, WithPart use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; use std::future::Future; -use std::ops::RangeInclusive; #[derive(Debug, Clone, PartialEq)] pub enum InboxEntry { @@ -106,9 +105,4 @@ pub trait InboxTable { &mut self, service_id: &ServiceId, ) -> impl Stream> + Send; - - fn all_inboxes( - &mut self, - range: RangeInclusive, - ) -> impl Stream> + Send; } diff --git a/crates/storage-query-datafusion/src/inbox/mod.rs b/crates/storage-query-datafusion/src/inbox/mod.rs index dca1c728e..f614fc33f 100644 --- a/crates/storage-query-datafusion/src/inbox/mod.rs +++ b/crates/storage-query-datafusion/src/inbox/mod.rs @@ -13,3 +13,6 @@ mod schema; mod table; pub(crate) use table::register_self; + +#[cfg(test)] +mod tests; diff --git a/crates/storage-query-datafusion/src/inbox/row.rs b/crates/storage-query-datafusion/src/inbox/row.rs index 15c2a1b28..b60c1f5c5 100644 --- a/crates/storage-query-datafusion/src/inbox/row.rs +++ b/crates/storage-query-datafusion/src/inbox/row.rs @@ -19,17 +19,16 @@ pub(crate) fn append_inbox_row( output: &mut String, inbox_entry: SequenceNumberInboxEntry, ) { + let mut row = builder.row(); + let SequenceNumberInboxEntry { inbox_sequence_number, inbox_entry, } = inbox_entry; if let InboxEntry::Invocation(service_id, invocation_id) = inbox_entry { - let mut row = builder.row(); row.partition_key(invocation_id.partition_key()); - row.service_name(&service_id.service_name); - row.service_key(&service_id.key); if row.is_id_defined() { diff --git a/crates/storage-query-datafusion/src/inbox/table.rs b/crates/storage-query-datafusion/src/inbox/table.rs index 4226f4b8d..114ba2933 100644 --- a/crates/storage-query-datafusion/src/inbox/table.rs +++ b/crates/storage-query-datafusion/src/inbox/table.rs @@ -18,7 +18,7 @@ use futures::{Stream, StreamExt}; use tokio::sync::mpsc::Sender; use restate_partition_store::{PartitionStore, PartitionStoreManager}; -use restate_storage_api::inbox_table::{InboxTable, SequenceNumberInboxEntry}; +use restate_storage_api::inbox_table::SequenceNumberInboxEntry; use restate_storage_api::StorageError; use restate_types::identifiers::PartitionKey; @@ -54,8 +54,7 @@ impl ScanLocalPartition for InboxScanner { range: RangeInclusive, projection: SchemaRef, ) { - let mut transaction = partition_store.transaction(); - let rows = transaction.all_inboxes(range); + let rows = partition_store.all_inboxes(range); for_each_state(projection, tx, rows).await; } } diff --git a/crates/storage-query-datafusion/src/inbox/tests.rs b/crates/storage-query-datafusion/src/inbox/tests.rs new file mode 100644 index 000000000..016cd481f --- /dev/null +++ b/crates/storage-query-datafusion/src/inbox/tests.rs @@ -0,0 +1,89 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::mocks::*; +use crate::row; +use datafusion::arrow::array::{LargeStringArray, UInt64Array}; +use datafusion::arrow::record_batch::RecordBatch; +use futures::StreamExt; +use googletest::all; +use googletest::prelude::{assert_that, eq}; +use restate_core::TaskCenterBuilder; +use restate_storage_api::inbox_table::{InboxEntry, InboxTable, SequenceNumberInboxEntry}; +use restate_storage_api::Transaction; +use restate_types::identifiers::{InvocationId, InvocationUuid, ServiceId, WithPartitionKey}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_inbox() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut engine = tc + .run_in_scope("mock-query-engine", None, MockQueryEngine::create()) + .await; + + let mut tx = engine.partition_store().transaction(); + let service_id = ServiceId::mock_random(); + let invocation_id_1 = + InvocationId::from_parts(service_id.partition_key(), InvocationUuid::new()); + tx.put_inbox_entry( + &service_id, + SequenceNumberInboxEntry { + inbox_sequence_number: 0, + inbox_entry: InboxEntry::Invocation(service_id.clone(), invocation_id_1), + }, + ) + .await; + let invocation_id_2 = + InvocationId::from_parts(service_id.partition_key(), InvocationUuid::new()); + tx.put_inbox_entry( + &service_id, + SequenceNumberInboxEntry { + inbox_sequence_number: 1, + inbox_entry: InboxEntry::Invocation(service_id.clone(), invocation_id_2), + }, + ) + .await; + tx.commit().await.unwrap(); + + let records = engine + .execute("SELECT * FROM sys_inbox ORDER BY sequence_number") + .await + .unwrap() + .collect::>>() + .await + .remove(0) + .unwrap(); + + assert_that!( + records, + all!( + row!( + 0, + { + "id" => LargeStringArray: eq(invocation_id_1.to_string()), + "sequence_number" => UInt64Array: eq(0), + "service_name" => LargeStringArray: eq(service_id.service_name.to_string()), + "service_key" => LargeStringArray: eq(service_id.key.to_string()), + } + ), + row!( + 1, + { + "id" => LargeStringArray: eq(invocation_id_2.to_string()), + "sequence_number" => UInt64Array: eq(1), + "service_name" => LargeStringArray: eq(service_id.service_name.to_string()), + "service_key" => LargeStringArray: eq(service_id.key.to_string()), + } + ) + ) + ); +}