Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao committed Mar 20, 2021
2 parents 24ce3e3 + 7f07a66 commit fbefe75
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 215 deletions.
2 changes: 1 addition & 1 deletion src/function/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "cloud-function"
name = "function"
version = "0.1.0"
description = "The generic cloud functions for serverless computation."
authors = [ "Gang Liao <[email protected]>" ]
Expand Down
9 changes: 7 additions & 2 deletions src/function/src/aws/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ fn invoke_async_functions(ctx: &ExecutionContext, batches: &mut Vec<RecordBatch>
loop {
let request = InvokeAsyncRequest {
function_name: next_func.clone(),
invoke_args: Payload::to_bytes(&[batches.pop().unwrap()], uuid.clone()),
invoke_args: Payload::to_vec(
&[batches.pop().unwrap()],
uuid.clone(),
Encoding::default(),
)
.into(),
};

if let Ok(reponse) = block_on(client.invoke_async(request)) {
Expand Down Expand Up @@ -204,7 +209,7 @@ async fn payload_handler(ctx: &mut ExecutionContext, event: Value) -> Result<Val
ctx.feed_one_source(&vec![batches]);
let batches = ctx.execute().await?;

Ok(Payload::to_value(&batches, uuid))
Ok(Payload::to_value(&batches, uuid, Encoding::default()))
}

async fn handler(event: Value, _: Context) -> Result<Value> {
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl ExecutionContext {
Encoding::Snappy | Encoding::Lz4 | Encoding::Zstd => {
let encoded: Vec<u8> = serde_json::to_vec(&self).unwrap();
serde_json::to_string(&CloudEnvironment {
context: encoding.compress(&encoded),
context: encoding.compress(encoded),
encoding,
})
.unwrap()
Expand Down
9 changes: 4 additions & 5 deletions src/runtime/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ impl Default for Encoding {

impl Encoding {
/// Compress data
pub fn compress(&self, s: &[u8]) -> Vec<u8> {
pub fn compress(&self, s: Vec<u8>) -> Vec<u8> {
match *self {
Encoding::Snappy => {
let mut encoder = snap::raw::Encoder::new();
encoder.compress_vec(&s).unwrap()
}
Encoding::Lz4 => lz4::block::compress(&s, None, true).unwrap(),
Encoding::Zstd => zstd::block::compress(&s, 3).unwrap(),
_ => {
unimplemented!();
}
Encoding::None => s,
_ => unimplemented!(),
}
}

Expand Down Expand Up @@ -149,7 +148,7 @@ mod tests {
let json = serde_json::to_string(&plan).unwrap();

let now = Instant::now();
let en_json = en.compress(&json.as_bytes());
let en_json = en.compress(json.as_bytes().to_vec());
println!("Compression time: {} μs", now.elapsed().as_micros());

let now = Instant::now();
Expand Down
7 changes: 6 additions & 1 deletion src/runtime/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use crate::config::GLOBALS as globals;
use crate::context::CloudFunction;
use crate::context::ExecutionContext;
use crate::encoding::Encoding;
use crate::error::{Result, SquirtleError};
use crate::payload::{Payload, Uuid};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -123,7 +124,11 @@ pub trait Executor {
assert_eq!(1, output_partitions.len());
assert_eq!(1, output_partitions[0].len());

Ok(Payload::to_value(&output_partitions[0], Uuid::default()))
Ok(Payload::to_value(
&output_partitions[0],
Uuid::default(),
Encoding::default(),
))
}
}

Expand Down
Loading

0 comments on commit fbefe75

Please sign in to comment.