Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: expose writer flush_buf #333

Closed
wants to merge 1 commit into from
Closed

api: expose writer flush_buf #333

wants to merge 1 commit into from

Conversation

okjodom
Copy link

@okjodom okjodom commented Aug 24, 2023

Useful for scenarios where manually flushing the buffer and keeping the Writer is desirable

- useful for scenarios where manually flushing the buffer and keeping the
  Writer is desirable
@BurntSushi
Copy link
Owner

Please provide a use case. I don't understand why you would want to do this.

@okjodom
Copy link
Author

okjodom commented Aug 24, 2023

Please provide a use case. I don't understand why you would want to do this.

I'm writing a long running simulator that writes its results to csv file. To monitor progress of the simulation, I tail the results written on the file. I need to periodically write (complete) results to file without flushing the writer. Flushing the writer and re-instantiating it is not ideal because then I'd have to buffer the data I want to write elsewhere before serialization to writer.

If I have wait for the writer to auto flush when the buffer eventually fills up, I'm forced to reduce the buffer size so auto-flush happens more often on some predictable schedule. Still this is not ideal because estimating a buffer size is imprecise, and auto-flush will most likely cut-off the last line written to file, making tail of the results written to file unusable

flush_buf api is perfect because I can control when to write to file, and eventually flush when I need to close the writer. The only downside I can think of in using this pattern, is that the buffer might be underutilized (but users can always reduce allocated buffer size if necessary)

@BurntSushi
Copy link
Owner

I need to periodically write (complete) results to file without flushing the writer.

This doesn't make any sense to me?

It sounds to me like you want Writer::flush, which already exists and is public.

Flushing the writer and re-instantiating it

I don't know where "re-instantiating it" is coming from. Calling csv::Writer::flush just flushes the buffer and then the underlying writer. There is no re-instantiation happening. (Although I'm still not sure what that means in this context.)

You might need to provide a minimal code example demonstrating your issue.

Like you mentioning tail is indeed usually the magic word indicating that you want to call flush. But what's unclear to me is specifically why flush is inappropriate for you.

@okjodom
Copy link
Author

okjodom commented Aug 25, 2023

It sounds to me like you want Writer::flush, which already exists and is public.

This is the opposite of what I want in my scenario. I want to keep the writer, even when I flush the buffer in a controlled manner

Flushing the writer and re-instantiating it

I don't know where "re-instantiating it" is coming from. Calling csv::Writer::flush just flushes the buffer and then the underlying writer. There is no re-instantiation happening. (Although I'm still not sure what that means in this context.)

By this, I meant to say if I use csv::Writer::flush, I'd have to create a new Writer, and that is undesirable in my context.

Once I start streaming data and serializing them on the writer, it's desirable that I keep the writer for as long as possible. It's also desirable that I control when to flush the writer buffer to file, so I always have complete lines written on the csv

@BurntSushi
Copy link
Owner

BurntSushi commented Aug 25, 2023

keep the writer for as long as possible

csv::Writer::flush doesn't have anything to do with "keeping the writer."

I'm sorry, but I can't keep doing this back and forth. There is some kind of communication barrier between us as I do not understand why you want this. In order to move forward, I think you're going to have to demonstrate why you want this with a minimal code sample.

@okjodom
Copy link
Author

okjodom commented Aug 25, 2023

scenario like this:

/// receives data from a streaming channel
/// writes to file after every 100 data events
/// does this without flushing the writer
fn print_stream(rx: mpsc::Receiver) {
    let mut writer = csv::WriterBuilder::new().from_path(format!(
        "example_{:?}.csv",
        SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap()
            .as_secs()
    ))?;

    let batch_size = 100
    let counter = 1;

    while let Some(data) = rx.recv() {
        // keep serializing data to writer, assuming a large buffer
        writer.serialize(data).unwrap()
        
        // flush buffer if we have reached out batch size
        // we don't necessarily wait for buffer to fill up
        if batch_size == counter {
            // keep the writer alive
            writer.flush_buf().unwrap();
            counter = 1;
        } else {
            counter += 1;
        }
    }

    // flush buffer and writer when stream is done
    writer.flush().unwrap()

    Ok(())
}

@BurntSushi
Copy link
Owner

That isn't minimal because I can't run it. And this part:

            // keep the writer alive
            writer.flush_buf().unwrap();

doesn't make any sense to me. writer.flush() also keeps the writer "alive."

@okjodom
Copy link
Author

okjodom commented Aug 26, 2023

That isn't minimal because I can't run it.

ack. TIL how to write a minimal reproducible example, though this doesn't seem quite minimal

use std::{io, error::Error};

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

#[derive(Debug, Serialize, Deserialize)]
struct Data {
    foo: String,
    bar: String,
}

#[tokio::main]
async fn main() {
    // data channel
    let (tx, rx) = mpsc::channel(100);

    // generate and stream data
    tokio::spawn(async move {
        loop {
            let data = Data {
                foo: "Hello".into(),
                bar: "World".into(),
            };

            if let Err(_) = tx.send(data).await {
                println!("receiver dropped");
                break;
            }

            let _ = tokio::time::sleep(std::time::Duration::from_secs(1));
        }
    });

    print_data_stream(rx).await.unwrap()
}

async fn print_data_stream(mut rx: mpsc::Receiver<Data>) -> Result<(), Box<dyn Error>> {
    let mut wtr = csv::Writer::from_writer(io::stdout());

    let batch_size = 100;
    let mut counter = 1;

    while let Some(data) = rx.recv().await {
        wtr.serialize(data)?;

        // flush buffer if we have reached out batch size
        if batch_size == counter {
            wtr.flush()?;
            // writer.flush_buf()?; <---- should I be doing this
            counter = 1;
        } else {
            counter += 1;
        }
    }
    wtr.flush()?;

    Ok(())
}

And this part:

            // keep the writer alive
            writer.flush_buf().unwrap();

doesn't make any sense to me. writer.flush() also keeps the writer "alive."

I see what you mean now. I was able to use Writer::flush() to satisfy my scenario, whereas I was previously convinced I needed Writer::flush_buf

From the method docs,

"Note that this also flushes the underlying writer."

seemed like the writer would be "destroyed" on flush(). Still, I should have validated this anyways

@BurntSushi
Copy link
Owner

BurntSushi commented Aug 26, 2023

Still, I should have validated this anyways

Yes indeed. Please at least try the suggestion from the maintainer next time.

seemed like the writer would be "destroyed" on flush()

"flush the writer" does not mean "destroy the writer." If the writer were destroyed, then what would happen on the next write? csv::Writer doesn't know how to create a writer at that point. It requires the caller to provide a writer.

"flush the writer" literally just means taking whatever contents are in memory and shoving them down a layer of abstraction.

@BurntSushi BurntSushi closed this Aug 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants