Skip to content

Commit

Permalink
Update avro and csv sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jun 13, 2024
1 parent 9b3a476 commit 089c384
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 133 deletions.
3 changes: 2 additions & 1 deletion examples/avro_rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ fn main() {

source
.inspect(|e| eprintln!("{e:?}"))
.repartition_by(Replication::Unlimited, |x| (x.num % 5).into())
.map(|mut e| {
e.num *= 2;
e
})
.write_avro(opts.output);
.write_avro_seq(opts.output);

ctx.execute_blocking();
}
15 changes: 3 additions & 12 deletions examples/csv_write_read.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use renoir::prelude::*;

fn main() {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let conf = RuntimeConfig::local(4).unwrap();

let ctx = StreamContext::new(conf.clone());
Expand All @@ -13,22 +10,16 @@ fn main() {
eprintln!("Writing to {}", dir_path.display());

// Write to multiple files in parallel
let mut path = dir_path.clone();
let path = dir_path.clone();
ctx.stream_par_iter(0..100)
.map(|i| (i, format!("{i:08x}")))
.write_csv(
move |i| {
path.push(format!("{i:03}.csv"));
path
},
false,
);
.write_csv_seq(path, false);

ctx.execute_blocking();

let ctx = StreamContext::new(conf);
let mut path = dir_path;
path.push("001.csv");
path.push("0001.csv");
ctx.stream_csv::<(i32, String)>(path)
.for_each(|t| println!("{t:?}"));

Expand Down
162 changes: 83 additions & 79 deletions src/operator/sink/avro.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,87 @@
use apache_avro::{AvroSchema, Schema, Writer};
use serde::Serialize;
use std::fmt::Display;
use std::fs::File;
use std::io::BufWriter;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::path::PathBuf;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::{Operator, StreamElement};
use crate::block::NextStrategy;
use crate::operator::{ExchangeData, Operator};
use crate::scheduler::ExecutionMetadata;
use crate::Stream;
use crate::{CoordUInt, Replication, Stream};

use super::writer::{sequential_path, WriteOperator, WriterOperator};

// #[derive(Debug)]
pub struct AvroSink<Op>
where
Op: Operator,
{
prev: Op,
path: PathBuf,
/// Reader used to parse the CSV file.
pub struct AvroSink<T> {
_t: PhantomData<T>,
writer: Option<BufWriter<File>>,
schema: Schema,
}

impl<Op> AvroSink<Op>
impl<T> AvroSink<T>
where
Op: Operator,
Op::Out: AvroSchema,
T: AvroSchema + Serialize,
{
pub fn new<P: Into<PathBuf>>(prev: Op, path: P) -> Self {
pub fn new() -> Self {
Self {
path: path.into(),
prev,
_t: PhantomData,
writer: None,
schema: Op::Out::get_schema(),
schema: T::get_schema(),
}
}
}

impl<Op> Display for AvroSink<Op>
impl<T: Serialize> WriteOperator<T> for AvroSink<T>
where
Op: Operator,
T: AvroSchema + Serialize + Send,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> AvroSink<{}>",
self.prev,
std::any::type_name::<Op::Out>()
)
type Destination = PathBuf;

fn write(&mut self, items: &mut impl Iterator<Item = T>) {
let w = self.writer.as_mut().unwrap();
let mut w = Writer::new(&self.schema, w);
for item in items {
w.append_ser(item).expect("failed to write to avro");
}
w.flush().unwrap();
}
}

impl<Op> Operator for AvroSink<Op>
where
Op: Operator,
Op::Out: AvroSchema + Serialize,
{
type Out = ();
fn flush(&mut self) { /* already flushes in write */
}

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);
fn finalize(&mut self) {
if let Some(mut w) = self.writer.take() {
w.flush().unwrap();
}
}

fn setup(&mut self, destination: Self::Destination) {
let file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&self.path)
.open(&destination)
.unwrap_or_else(|err| {
panic!(
"AvroSource: error while opening file {:?}: {:?}",
self.path, err
destination, err
)
});

let buf_writer = BufWriter::new(file);
self.writer = Some(buf_writer);
}

fn next(&mut self) -> StreamElement<()> {
let writer = self.writer.as_mut().unwrap();
let mut w = Writer::new(&self.schema, writer);
loop {
match self.prev.next() {
StreamElement::Item(t) | StreamElement::Timestamped(t, _) => {
// w.extend_ser(values)
w.append_ser(t).expect("failed to write to avro");
}
el => {
w.flush().unwrap();
return el.map(|_| ());
}
}
}
}

fn structure(&self) -> BlockStructure {
let mut operator = OperatorStructure::new::<Op::Out, _>("AvroSink");
operator.kind = OperatorKind::Sink;
self.prev.structure().add_operator(operator)
}
}

impl<Op> Clone for AvroSink<Op>
where
Op: Operator,
{
impl<T> Clone for AvroSink<T> {
fn clone(&self) -> Self {
panic!("AvroSink cannot be cloned, replication should be 1");
Self {
_t: PhantomData,
writer: None,
schema: self.schema.clone(),
}
}
}

Expand All @@ -116,21 +90,51 @@ where
Op: 'static,
Op::Out: AvroSchema + Serialize,
{
/// Apply the given function to all the elements of the stream, consuming the stream.
pub fn write_avro<F: FnOnce(CoordUInt) -> PathBuf + Clone + Send + 'static>(
self,
make_path: F,
) {
let make_destination = |metadata: &ExecutionMetadata| (make_path)(metadata.global_id);

self.add_operator(|prev| {
let writer = AvroSink::new();
WriterOperator::new(prev, writer, make_destination)
})
.finalize_block();
}

/// Write output to avro files. A avro is created for each replica of the current block.
/// A file with a numerical suffix is created according to the path passed as parameter.
///
/// ## Example
/// + If the input is a directory numbered files will be created as output.
/// + If the input is a file name the basename will be kept as prefix and numbers will
/// be added as suffix while keeping the same extension for the output.
///
/// ```
/// # use renoir::{StreamContext, RuntimeConfig};
/// # use renoir::operator::source::IteratorSource;
/// # let mut env = StreamContext::new_local();
/// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
/// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key));
/// ## Example
///
/// env.execute_blocking();
/// ```
pub fn write_avro<P: Into<PathBuf>>(self, path: P) {
self.add_operator(|prev| AvroSink::new(prev, path))
/// + `template_path`: `/data/renoir/output.avro` -> `/data/renoir/output0000.avro`, /data/renoir/output0001.avro` ...
/// + `template_path`: `/data/renoir/` -> `/data/renoir/0000.avro`, /data/renoir/0001.avro` ...
pub fn write_avro_seq(self, template_path: PathBuf) {
self.add_operator(|prev| {
let writer = AvroSink::new();
WriterOperator::new(prev, writer, |m| sequential_path(template_path, m))
})
.finalize_block();
}
}

impl<Op: Operator> Stream<Op>
where
Op: 'static,
Op::Out: AvroSchema + ExchangeData,
{
pub fn write_avro_one<P: Into<PathBuf>>(self, path: P) {
let path = path.into();
self.repartition(Replication::One, NextStrategy::only_one())
.add_operator(|prev| {
let writer = AvroSink::new();
WriterOperator::new(prev, writer, move |_| path)
})
.finalize_block();
}
}
Loading

0 comments on commit 089c384

Please sign in to comment.