Skip to content

Commit

Permalink
Unbreak datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Apr 30, 2024
1 parent 25e45df commit 8ee4120
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 433 deletions.
17 changes: 1 addition & 16 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,14 @@ impl PartitionStoreManager {
) -> std::result::Result<Self, RocksError> {
let options = storage_opts.load();

// todo: temporary until we completely remove unpartitioned cf.
let mut ensure_cfs = partition_ids_to_cfs(initial_partition_set);
ensure_cfs.push(CfName::new("data-unpartitioned"));

let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::PartitionProcessor,
options.data_dir(),
db_options(),
)
.add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options)
.ensure_column_families(ensure_cfs)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
.build_as_optimistic_db();

let manager = RocksDbManager::get();
Expand Down Expand Up @@ -99,17 +95,6 @@ impl PartitionStoreManager {
self.lookup.lock().await.live.get(&partition_id).cloned()
}

#[allow(non_snake_case)]
pub fn get_legacy_storage_REMOVE_ME(&self) -> PartitionStore {
PartitionStore::new(
self.raw_db.clone(),
self.rocksdb.clone(),
CfName::new("data-unpartitioned"),
0,
RangeInclusive::new(0, PartitionKey::MAX - 1),
)
}

pub async fn open_partition_store(
&self,
partition_id: PartitionId,
Expand Down
1 change: 1 addition & 0 deletions crates/storage-query-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ default = []
options_schema = ["dep:schemars"]

[dependencies]
restate-core = { workspace = true }
restate-invoker-api = { workspace = true }
restate-partition-store = { workspace = true }
restate-schema-api = { workspace = true, features = ["deployment"] }
Expand Down
61 changes: 51 additions & 10 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use codederror::CodedError;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};

use restate_core::worker_api::ProcessorsManagerHandle;
use restate_invoker_api::StatusHandle;
use restate_partition_store::{PartitionStore, PartitionStoreManager};
use restate_partition_store::PartitionStoreManager;
use restate_schema_api::deployment::DeploymentResolver;
use restate_schema_api::service::ServiceMetadataResolver;
use restate_types::config::QueryEngineOptions;
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;

use crate::{analyzer, physical_optimizer};

Expand Down Expand Up @@ -72,6 +76,11 @@ pub enum BuildError {
Datafusion(#[from] DataFusionError),
}

#[async_trait]
pub trait SelectPartitions: Send + Sync + Debug + 'static {
async fn get_live_partitions(&self) -> Result<Vec<PartitionId>, GenericError>;
}

#[derive(Clone)]
pub struct QueryContext {
datafusion_context: SessionContext,
Expand All @@ -80,8 +89,8 @@ pub struct QueryContext {
impl QueryContext {
pub async fn create(
options: &QueryEngineOptions,
_partition_store_manager: PartitionStoreManager,
rocksdb: PartitionStore,
partition_selector: impl SelectPartitions + Clone,
partition_store_manager: PartitionStoreManager,
status: impl StatusHandle + Send + Sync + Debug + Clone + 'static,
schemas: impl DeploymentResolver
+ ServiceMetadataResolver
Expand All @@ -96,15 +105,40 @@ impl QueryContext {
options.tmp_dir.clone(),
options.query_parallelism(),
);
crate::invocation_status::register_self(&ctx, rocksdb.clone())?;
crate::keyed_service_status::register_self(&ctx, rocksdb.clone())?;
crate::state::register_self(&ctx, rocksdb.clone())?;
crate::journal::register_self(&ctx, rocksdb.clone())?;
crate::invocation_state::register_self(&ctx, status)?;
crate::inbox::register_self(&ctx, rocksdb.clone())?;
crate::deployment::register_self(&ctx, schemas.clone())?;
crate::service::register_self(&ctx, schemas)?;
crate::idempotency::register_self(&ctx, rocksdb)?;
crate::invocation_state::register_self(&ctx, status)?;
// partition-key-based
crate::invocation_status::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager.clone(),
)?;
crate::keyed_service_status::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager.clone(),
)?;
crate::state::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager.clone(),
)?;
crate::journal::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager.clone(),
)?;
crate::inbox::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager.clone(),
)?;
crate::idempotency::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager,
)?;

let ctx = ctx
.datafusion_context
Expand Down Expand Up @@ -173,3 +207,10 @@ impl AsRef<SessionContext> for QueryContext {
&self.datafusion_context
}
}

#[async_trait]
impl SelectPartitions for ProcessorsManagerHandle {
async fn get_live_partitions(&self) -> Result<Vec<PartitionId>, GenericError> {
Ok(self.get_live_partitions().await?)
}
}
28 changes: 14 additions & 14 deletions crates/storage-query-datafusion/src/deployment/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::schema::DeploymentBuilder;
use std::fmt::Debug;
use std::sync::Arc;

use crate::context::QueryContext;
use crate::deployment::row::append_deployment_row;
use crate::generic_table::{GenericTableProvider, RangeScanner};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use datafusion::physical_plan::SendableRecordBatchStream;
pub use datafusion_expr::UserDefinedLogicalNode;
use restate_schema_api::deployment::{Deployment, DeploymentResolver};
use restate_types::identifiers::{PartitionKey, ServiceRevision};
use std::fmt::Debug;
use std::ops::RangeInclusive;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

use restate_schema_api::deployment::{Deployment, DeploymentResolver};
use restate_types::identifiers::ServiceRevision;

use super::schema::DeploymentBuilder;
use crate::context::QueryContext;
use crate::deployment::row::append_deployment_row;
use crate::table_providers::{GenericTableProvider, Scan};

pub(crate) fn register_self(
ctx: &QueryContext,
resolver: impl DeploymentResolver + Send + Sync + Debug + 'static,
Expand All @@ -42,15 +43,14 @@ pub(crate) fn register_self(
#[derive(Debug, Clone)]
struct DeploymentMetadataScanner<DMR>(DMR);

/// TODO This trait makes little sense for sys_deployment,
/// but it's fine nevertheless as the caller always uses the full range
impl<DMR: DeploymentResolver + Debug + Sync + Send + 'static> RangeScanner
impl<DMR: DeploymentResolver + Debug + Sync + Send + 'static> Scan
for DeploymentMetadataScanner<DMR>
{
fn scan(
&self,
_range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
_filters: &[Expr],
_limit: Option<usize>,
) -> SendableRecordBatchStream {
let schema = projection.clone();
let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16);
Expand Down
161 changes: 0 additions & 161 deletions crates/storage-query-datafusion/src/generic_table.rs

This file was deleted.

Loading

0 comments on commit 8ee4120

Please sign in to comment.