Skip to content

Commit

Permalink
Avro PoC (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed May 24, 2024
1 parent 3497c5e commit 33a4cba
Show file tree
Hide file tree
Showing 11 changed files with 862 additions and 1 deletion.
254 changes: 253 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ indexmap = "2.2.6"
tracing = { version = "0.1.40", features = ["log"] }
quick_cache = "0.5.1"
dashmap = "5.5.3"
apache-avro = { version = "0.16.0", features = ["derive"] }
dyn-clone = "1.0.17"

[dev-dependencies]
# for the tests
Expand Down
41 changes: 41 additions & 0 deletions examples/avro_rw.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::path::PathBuf;

use apache_avro::AvroSchema;
use clap::Parser;
use renoir::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Debug, Parser)]
struct Options {
#[clap(short,long)]
input: Option<PathBuf>,

#[clap(short,long)]
output: PathBuf,
}

#[derive(Serialize, Deserialize, AvroSchema, Clone, Debug)]
struct InputType {
s: String,
num: u32,
}

fn main() {
let (conf, args) = RuntimeConfig::from_args();
let opts = Options::parse_from(args);
conf.spawn_remote_workers();

let ctx = StreamContext::new(conf.clone());

let source = if let Some(input) = opts.input {
ctx.stream_avro(input).into_boxed()
} else {
ctx.stream_iter((0..100).map(|i| InputType{ s: format!("{i:o}"), num: i })).into_boxed()
};

source.inspect(|e| eprintln!("{e:?}"))
.map(|mut e| { e.num *= 2; e })
.write_avro(opts.output);

ctx.execute_blocking();
}
103 changes: 103 additions & 0 deletions src/operator/boxed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::fmt::Display;

use dyn_clone::DynClone;

use crate::{
block::BlockStructure,
operator::{Data, Operator, StreamElement},
ExecutionMetadata, Stream,
};

pub(crate) trait DynOperator: DynClone {
type Out: Data;
/// Setup the operator chain. This is called before any call to `next` and it's used to
/// initialize the operator. When it's called the operator has already been cloned and it will
/// never be cloned again. Therefore it's safe to store replica-specific metadata inside of it.
///
/// It's important that each operator (except the start of a chain) calls `.setup()` recursively
/// on the previous operators.
fn setup(&mut self, metadata: &mut ExecutionMetadata);

/// Take a value from the previous operator, process it and return it.
fn next(&mut self) -> StreamElement<Self::Out>;

/// A more refined representation of the operator and its predecessors.
fn structure(&self) -> BlockStructure;
}

dyn_clone::clone_trait_object!(<O> DynOperator<Out=O>);

impl<Op> DynOperator for Op
where
Op: Operator,
<Op as Operator>::Out: Clone + Send + 'static,
{
type Out = Op::Out;

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.setup(metadata)
}

fn next(&mut self) -> StreamElement<Op::Out> {
self.next()
}

fn structure(&self) -> BlockStructure {
self.structure()
}
}

pub struct BoxedOperator<O> {
pub(crate) op: Box<dyn DynOperator<Out=O> + 'static + Send>,
}

impl<T> Clone for BoxedOperator<T> {
fn clone(&self) -> Self {
Self {
op: self.op.clone(),
}
}
}

impl<T> Display for BoxedOperator<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BoxedOperator")
}
}

impl<O: Data> BoxedOperator<O> {
pub fn new<Op: Operator<Out=O> + 'static>(op: Op) -> Self {
Self {
op: Box::new(op),
}
}
}

impl<O: Data> Operator for BoxedOperator<O> {
type Out = O;

fn next(&mut self) -> StreamElement<O> {
self.op.next()
}

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.op.setup(metadata)
}

fn structure(&self) -> BlockStructure {
self.op.structure()
}
}

impl<Op> Stream<Op>
where
Op: Operator + 'static,
Op::Out: Clone + Send + 'static,
{
/// Erase operator type using dynamic dispatching.
///
/// Use only when strictly necessary as it is decrimental for performance.
pub fn into_boxed(self) -> Stream<BoxedOperator<Op::Out>> {
self.add_operator(|prev| BoxedOperator::new(prev))
}
}
3 changes: 3 additions & 0 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::fmt::Display;
use std::hash::Hash;
use std::ops::{AddAssign, Div};
use std::path::PathBuf;

use flume::{unbounded, Receiver};
#[cfg(feature = "tokio")]
Expand All @@ -25,6 +26,7 @@ use crate::{BatchMode, KeyedStream, Stream};
#[cfg(feature = "tokio")]
use self::map_async::MapAsync;
use self::map_memo::MapMemo;
use self::sink::avro::AvroSink;
use self::sink::collect::Collect;
use self::sink::collect_channel::CollectChannelSink;
use self::sink::collect_count::CollectCountSink;
Expand Down Expand Up @@ -86,6 +88,7 @@ pub mod source;
mod start;
pub mod window;
mod zip;
mod boxed;

/// Marker trait that all the types inside a stream should implement.
pub trait Data: Clone + Send + 'static {}
Expand Down
166 changes: 166 additions & 0 deletions src/operator/sink/avro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use apache_avro::{AvroSchema, Schema, Writer};
use serde::Serialize;
use std::fmt::Display;
use std::fs::File;
use std::io::BufWriter;
use std::marker::PhantomData;
use std::path::PathBuf;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::sink::StreamOutputRef;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;
use crate::Stream;

// #[derive(Debug)]
pub struct AvroSink<Op>
where
Op: Operator,
{
prev: Op,
path: PathBuf,
/// Reader used to parse the CSV file.
writer: Option<BufWriter<File>>,
schema: Schema,
}

impl<Op> AvroSink<Op>
where
Op: Operator,
Op::Out: AvroSchema,
{
pub fn new<P: Into<PathBuf>>(prev: Op, path: P) -> Self {
Self {
path: path.into(),
prev,
writer: None,
schema: Op::Out::get_schema(),
}
}
}

impl<Op> Display for AvroSink<Op>
where
Op: Operator,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> AvroSink<{}>",
self.prev,
std::any::type_name::<Op::Out>()
)
}
}

impl<Op> Operator for AvroSink<Op>
where
Op: Operator,
Op::Out: AvroSchema + Serialize,
{
type Out = ();

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);

let file = File::options()
.read(true)
.write(true)
.create(true)
.open(&self.path)
.unwrap_or_else(|err| {
panic!(
"AvroSource: error while opening file {:?}: {:?}",
self.path, err
)
});


let buf_writer = BufWriter::new(file);
self.writer = Some(buf_writer);
}

fn next(&mut self) -> StreamElement<()> {
let writer = self.writer.as_mut().unwrap();
let mut w = Writer::new(&self.schema, writer);
loop {
match self.prev.next() {
StreamElement::Item(t) | StreamElement::Timestamped(t, _) => {
// w.extend_ser(values)
w.append_ser(t).expect("failed to write to avro");
}
el => {
w.flush().unwrap();
return el.map(|_| ());
}
}
}
}

fn structure(&self) -> BlockStructure {
let mut operator = OperatorStructure::new::<Op::Out, _>("AvroSink");
operator.kind = OperatorKind::Sink;
self.prev.structure().add_operator(operator)
}
}

impl<Op> Clone for AvroSink<Op>
where
Op: Operator,
{
fn clone(&self) -> Self {
panic!("AvroSink cannot be cloned, replication should be 1");
}
}

impl<Op: Operator> Stream<Op> where
Op: 'static,
Op::Out: AvroSchema + Serialize
{

/// Apply the given function to all the elements of the stream, consuming the stream.
///
/// ## Example
///
/// ```
/// # use renoir::{StreamContext, RuntimeConfig};
/// # use renoir::operator::source::IteratorSource;
/// # let mut env = StreamContext::new_local();
/// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
/// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key));
///
/// env.execute_blocking();
/// ```
pub fn write_avro<P: Into<PathBuf>>(self, path: P)
{
self.add_operator(|prev| AvroSink::new(prev, path))
.finalize_block();
}
}

// #[cfg(test)]
// mod qtests {
// use std::AvroSinkions::HashSet;

// use crate::config::RuntimeConfig;
// use crate::environment::StreamContext;
// use crate::operator::source;

// #[test]
// fn AvroSink_vec() {
// let env = StreamContext::new(RuntimeConfig::local(4).unwrap());
// let source = source::IteratorSource::new(0..10u8);
// let res = env.stream(source).AvroSink::<Vec<_>>();
// env.execute_blocking();
// assert_eq!(res.get().unwrap(), (0..10).AvroSink::<Vec<_>>());
// }

// #[test]
// fn AvroSink_set() {
// let env = StreamContext::new(RuntimeConfig::local(4).unwrap());
// let source = source::IteratorSource::new(0..10u8);
// let res = env.stream(source).AvroSink::<HashSet<_>>();
// env.execute_blocking();
// assert_eq!(res.get().unwrap(), (0..10).AvroSink::<HashSet<_>>());
// }
// }
1 change: 1 addition & 0 deletions src/operator/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(super) mod collect_channel;
pub(super) mod collect_count;
pub(super) mod collect_vec;
pub(super) mod for_each;
pub(super) mod avro;

pub(crate) type StreamOutputRef<Out> = Arc<Mutex<Option<Out>>>;

Expand Down
Loading

0 comments on commit 33a4cba

Please sign in to comment.