Skip to content

Commit

Permalink
Extend source to enable read from remote storage
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Aug 10, 2021
1 parent 0125451 commit 0821202
Show file tree
Hide file tree
Showing 20 changed files with 1,001 additions and 507 deletions.
4 changes: 3 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
LogicalPlanBuilder::scan_parquet_with_name(
&scan.path,
projection,
24,
create_datafusion_context_concurrency(24),
&scan.table_name,
)? //TODO concurrency
.build()
Expand Down Expand Up @@ -1100,6 +1100,8 @@ impl TryInto<Field> for &protobuf::Field {
}
}

use crate::utils::create_datafusion_context_concurrency;
use datafusion::physical_plan::datetime_expressions::to_timestamp;
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
Expand Down
15 changes: 10 additions & 5 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::utils::create_datafusion_context_concurrency;
use crate::{convert_box_required, convert_required, into_required};
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
Expand Down Expand Up @@ -129,14 +131,13 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::ParquetScan(scan) => {
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let filenames: Vec<&str> =
scan.filename.iter().map(|s| s.as_str()).collect();
Ok(Arc::new(ParquetExec::try_from_files(
&filenames,
let path: &str = scan.filename[0].as_str();
Ok(Arc::new(ParquetExec::try_from_path(
path,
Some(projection),
None,
scan.batch_size as usize,
scan.num_partitions as usize,
create_datafusion_context_concurrency(scan.num_partitions as usize),
None,
)?))
}
Expand Down Expand Up @@ -614,13 +615,17 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;

let object_store_registry = Arc::new(ObjectStoreRegistry::new());

let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry,
};

let fun_expr = functions::create_physical_fun(
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
let filenames = exec
.partitions()
.iter()
.flat_map(|part| part.filenames().to_owned())
.flat_map(|part| part.filenames())
.collect();
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
Expand Down
5 changes: 5 additions & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ pub fn create_datafusion_context(
ExecutionContext::with_config(config)
}

/// Create a DataFusion context that is compatible with Ballista in concurrency
pub fn create_datafusion_context_concurrency(concurrency: usize) -> ExecutionContext {
ExecutionContext::with_concurrency(concurrency)
}

pub struct BallistaQueryPlanner {
scheduler_url: String,
config: BallistaConfig,
Expand Down
28 changes: 12 additions & 16 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ use self::state::{ConfigBackendClient, SchedulerState};
use ballista_core::config::BallistaConfig;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use datafusion::physical_plan::parquet::ParquetExec;
use ballista_core::utils::create_datafusion_context_concurrency;
use datafusion::datasource::parquet::ParquetRootDesc;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use std::time::{Instant, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -285,24 +286,19 @@ impl SchedulerGrpc for SchedulerServer {

match file_type {
FileType::Parquet => {
let parquet_exec =
ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
.map_err(|e| {
let msg = format!("Error opening parquet files: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
let ctx = create_datafusion_context_concurrency(1);
let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| {
let msg = format!("Error opening parquet files: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;

//TODO include statistics and any other info needed to reconstruct ParquetExec
Ok(Response::new(GetFileMetadataResult {
schema: Some(parquet_exec.schema().as_ref().into()),
partitions: parquet_exec
.partitions()
.iter()
.map(|part| FilePartitionMetadata {
filename: part.filenames().to_vec(),
})
.collect(),
schema: Some(parquet_desc.schema().as_ref().into()),
partitions: vec![FilePartitionMetadata {
filename: vec![path],
}],
}))
}
//TODO implement for CSV
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,10 @@ fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
Ok(Arc::new(ParquetTable::try_new(&path, max_concurrency)?))
Ok(Arc::new(ParquetTable::try_new(
&path,
ExecutionContext::with_concurrency(max_concurrency),
)?))
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
let table = ParquetTable::try_new(
&request.path[0],
ExecutionContext::with_concurrency(num_cpus::get()),
)
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
Expand Down
7 changes: 5 additions & 2 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ use std::string::String;
use std::sync::{Arc, Mutex};

use crate::datasource::datasource::Statistics;
use crate::datasource::local::LocalFileSystem;
use crate::datasource::object_store::ObjectStore;
use crate::datasource::{Source, TableProvider};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::Expr;
use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};
use crate::physical_plan::ExecutionPlan;

/// Represents a CSV file with a provided schema
pub struct CsvFile {
Expand All @@ -64,7 +66,8 @@ impl CsvFile {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
let filenames = common::build_file_list(&path, options.file_extension)?;
let filenames = LocalFileSystem
.list_all_files(path.as_str(), options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/datasource/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use crate::{
datasource::{Source, TableProvider},
error::{DataFusionError, Result},
physical_plan::{
common,
json::{NdJsonExec, NdJsonReadOptions},
ExecutionPlan,
},
};
use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};

use super::datasource::Statistics;
use crate::datasource::local::LocalFileSystem;
use crate::datasource::object_store::ObjectStore;

trait SeekRead: Read + Seek {}

Expand All @@ -57,7 +58,8 @@ impl NdJsonFile {
let schema = if let Some(schema) = options.schema {
schema
} else {
let filenames = common::build_file_list(path, options.file_extension)?;
let filenames =
LocalFileSystem.list_all_files(path, options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
Expand Down
126 changes: 126 additions & 0 deletions datafusion/src/datasource/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Object store that represents the Local File System.

use crate::datasource::object_store::{ObjectReader, ObjectStore};
use crate::error::DataFusionError;
use crate::error::Result;
use crate::parquet::file::reader::{ChunkReader, Length};
use std::any::Any;
use std::fs;
use std::fs::{metadata, File};
use std::io::Read;
use std::sync::Arc;

#[derive(Debug)]
/// Local File System as Object Store.
pub struct LocalFileSystem;

impl ObjectStore for LocalFileSystem {
fn as_any(&self) -> &dyn Any {
self
}

fn list_all_files(&self, path: &str, ext: &str) -> Result<Vec<String>> {
list_all(path, ext)
}

fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>> {
let file = File::open(file_path)?;
let reader = LocalFSObjectReader::new(file)?;
Ok(Arc::new(reader))
}
}

struct LocalFSObjectReader {
file: File,
}

impl LocalFSObjectReader {
fn new(file: File) -> Result<Self> {
Ok(Self { file })
}
}

impl ObjectReader for LocalFSObjectReader {
fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read> {
Box::new(FileSegmentReader::new(
self.file.try_clone().unwrap(),
start,
length,
))
}

fn length(&self) -> u64 {
self.file.len()
}
}

struct FileSegmentReader {
file: File,
start: u64,
length: usize,
}

impl FileSegmentReader {
fn new(file: File, start: u64, length: usize) -> Self {
Self {
file,
start,
length,
}
}
}

impl Read for FileSegmentReader {
fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
let mut file_source = self.file.get_read(self.start, self.length)?;
file_source.read(buf)
}
}

fn list_all(root_path: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
list_all_files(root_path, &mut filenames, ext)?;
Ok(filenames)
}

/// Recursively build a list of files in a directory with a given extension with an accumulator list
fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
list_all_files(path_name, filenames, ext)?;
} else if path_name.ends_with(ext) {
filenames.push(path_name.to_string());
}
} else {
return Err(DataFusionError::Plan("Invalid path".to_string()));
}
}
}
Ok(())
}
Loading

0 comments on commit 0821202

Please sign in to comment.