Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timely::CommunicationConfig::Cluster: Syscall param socketcall.sendto(msg) points to uninitialised byte(s) #399

Open
dreraic opened this issue Jul 12, 2021 · 4 comments

Comments

@dreraic
Copy link

dreraic commented Jul 12, 2021

Hello,

I am experimenting with timely dataflow, and adapted one of the examples from the docs to use a timely::CommunicationConfig::Cluster.

main.rs:

use clap::{App, Arg};
use timely;
use timely::dataflow::operators::{Input, Inspect};

pub fn main() {
    let matches = App::new("example")
        .arg(
            Arg::with_name("address")
                .short("a")
                .long("address")
                .takes_value(true)
                .multiple(true)
                .required(true),
        )
        .arg(
            Arg::with_name("index")
                .short("i")
                .long("index")
                .takes_value(true)
                .required(true),
        )
        .get_matches();

    let process = matches.value_of("index").unwrap().parse::<usize>().unwrap();
    let mut addresses = Vec::new();
    for a in matches.values_of("address").unwrap() {
        addresses.push(a.to_string());
    }

    let config = timely::Config {
        communication: timely::CommunicationConfig::Cluster {
            threads: 1,
            process: process,
            addresses: addresses,
            report: true,
            log_fn: Box::new(move |_communication_setup| None),
        },
        worker: timely::worker::Config::default(),
    };

    timely::execute(config, move |worker| {
        // add an input and base computation off of it
        let mut input = worker.dataflow(|scope| {
            let (input, stream) = scope.new_input();
            stream.inspect(|x| println!("hello {:?}", x));
            input
        });

        // introduce input, advance computation
        for round in 0..10 {
            input.send(round);
            input.advance_to(round + 1);
            worker.step();
        }
    })
    .expect("Timely execute error");
}

Cargo.toml:

[package]
name = "timely-cluster"
version = "0.1.0"
edition = "2018"

[dependencies]
timely = "0.12.0"
clap = "2.33.3"

[[bin]]
name = "timely-cluster"
path = "main.rs"

When I execute it with valgrind,

cargo build && \
(valgrind target/debug/timely-cluster --address 127.0.0.1:10001 --address 127.0.0.1:10002 --index 0 & \
valgrind target/debug/timely-cluster --address 127.0.0.1:10001 --address 127.0.0.1:10002 --index 1)

I see errors like this:

==9372== Thread 2 timely:send-1:
==9372== Syscall param socketcall.sendto(msg) points to uninitialised byte(s)
==9372==    at 0x4989C5C: send (in /usr/lib/libc-2.33.so)
==9372==    by 0x433FB6: send (net.rs:646)
==9372==    by 0x433FB6: std::net::udp::UdpSocket::send (udp.rs:665)
==9372==    by 0x292218: std::io::buffered::bufwriter::BufWriter<W>::flush_buf (bufwriter.rs:167)
==9372==    by 0x2943A6: <std::io::buffered::bufwriter::BufWriter<W> as std::io::Write>::flush (bufwriter.rs:635)
==9372==    by 0x290349: timely_communication::allocator::zero_copy::tcp::send_loop (tcp.rs:151)
==9372==    by 0x2E644C: timely_communication::allocator::zero_copy::initialize::initialize_networking_from_sockets::{{closure}} (initialize.rs:103)
==9372==    by 0x28BB2F: std::sys_common::backtrace::__rust_begin_short_backtrace (backtrace.rs:125)
==9372==    by 0x2B8B8C: std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} (mod.rs:476)
==9372==    by 0x2A5690: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (panic.rs:347)
==9372==    by 0x292B78: std::panicking::try::do_call (panicking.rs:401)
==9372==    by 0x297C3C: __rust_try (in /home/timely-cluster/target/debug/timely-cluster)
==9372==    by 0x2929B0: std::panicking::try (panicking.rs:365)
==9372==  Address 0x4be864c is 108 bytes inside a block of size 65,536 alloc'd
==9372==    at 0x483E7C5: malloc (vg_replace_malloc.c:380)
==9372==    by 0x2F83CB: alloc::alloc::alloc (alloc.rs:86)
==9372==    by 0x2F8456: alloc::alloc::Global::alloc_impl (alloc.rs:166)
==9372==    by 0x2F8709: <alloc::alloc::Global as core::alloc::Allocator>::allocate (alloc.rs:226)
==9372==    by 0x2F5B2C: alloc::raw_vec::RawVec<T,A>::allocate_in (raw_vec.rs:203)
==9372==    by 0x2F5F9C: alloc::raw_vec::RawVec<T,A>::with_capacity_in (raw_vec.rs:142)
==9372==    by 0x2F63C3: alloc::vec::Vec<T,A>::with_capacity_in (mod.rs:603)
==9372==    by 0x2F6326: alloc::vec::Vec<T>::with_capacity (mod.rs:464)
==9372==    by 0x291D3A: std::io::buffered::bufwriter::BufWriter<W>::with_capacity (bufwriter.rs:115)
==9372==    by 0x29009C: timely_communication::allocator::zero_copy::tcp::send_loop (tcp.rs:133)
==9372==    by 0x2E644C: timely_communication::allocator::zero_copy::initialize::initialize_networking_from_sockets::{{closure}} (initialize.rs:103)
==9372==    by 0x28BB2F: std::sys_common::backtrace::__rust_begin_short_backtrace (backtrace.rs:125)

Uninitialized bytes sounds bad to me, and even if this example does not crash, I believe this to be the cause for a segfault I see in a more complex application using timely::Cluster.

(I have checked it with rust 1.53 stable and rust 1.55 nightly)

Am I doing something wrong, or is this a bug?
Thanks.


edit: added braces to valgrind calls

@frankmcsherry
Copy link
Member

I'm sorry, I don't know enough about valgrind to know if there is a problem. The writer that is being flushed is pre-allocated with spare capacity, which means that a fair hunk of its buffer will be uninitialized

    let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);

Subsequently, we only write in byte slices,

    writer.write_all(&bytes[..]).expect("Write failure in send_loop.");

though admittedly they get populated in weird (unsafe) ways.

So, it's possible that there is uninitialized memory on the other end of the pointer passed in, in the allocated region, because writer.flush() is guaranteed to pass a length in that only references initialized data. Or it is possible that uninitialized bytes are coming from bytes, which would be problematic if that is what's happening.

@dreraic
Copy link
Author

dreraic commented Jul 13, 2021

Thanks for your reply. I'm no valgrind expert either, and tend to just adding print statements once it reports something.

I'm not sure whether I understand the first possibility you mentioned. But I believe it is the second one anyway, after I added some of those prints to send_loop:

let thread_name = std::thread::current().name().unwrap().to_string();

for idx in 0..bytes.len() {
    println!("{}  attempt to write byte #{}...", thread_name, idx);
    std::io::stdout().flush().expect("flush error");
    
    println!("{}  write byte #{}: {:?}", thread_name, idx, bytes[idx]); // line 200
    std::io::stdout().flush().expect("flush error");
}

writer.write_all(&bytes[..]).expect("Write failure in send_loop.");

Now this produces outputs like

<snip>
timely:send-0  write byte #107: 128
timely:send-0  attempt to write byte #108...
==24291== Thread 2 timely:send-0:
==24291== Conditional jump or move depends on uninitialised value(s)
==24291==    at 0x46FD5B: fmt_u64 (num.rs:249)
==24291==    by 0x46FD5B: core::fmt::num::imp::<impl core::fmt::Display for u8>::fmt (num.rs:287)
==24291==    by 0x2E5275: core::fmt::num::<impl core::fmt::Debug for u8>::fmt (num.rs:191)
==24291==    by 0x46B70E: core::fmt::write (mod.rs:1094)
==24291==    by 0x448E9D: write_fmt<std::io::stdio::StdoutLock> (mod.rs:1584)
==24291==    by 0x448E9D: <&std::io::stdio::Stdout as std::io::Write>::write_fmt (stdio.rs:657)
==24291==    by 0x449D17: write_fmt (stdio.rs:631)
==24291==    by 0x449D17: print_to<std::io::stdio::Stdout> (stdio.rs:934)
==24291==    by 0x449D17: std::io::stdio::_print (stdio.rs:947)

==24291==    by 0x29B6E3: timely_communication::allocator::zero_copy::tcp::send_loop (tcp.rs:200)

==24291==    by 0x2B1686: timely_communication::allocator::zero_copy::initialize::initialize_networking_from_sockets::{{closure}} (initialize.rs:99)
==24291==    by 0x2B38FF: std::sys_common::backtrace::__rust_begin_short_backtrace (backtrace.rs:125)
==24291==    by 0x2D80BC: std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} (mod.rs:481)
==24291==    by 0x2CDB10: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (panic.rs:344)
==24291==    by 0x2F41F8: std::panicking::try::do_call (panicking.rs:379)
==24291==    by 0x2FAC1C: __rust_try (in /home/timely-cluster/target/debug/timely-cluster)
<snip>

meaning that byte 108 of bytes is indeed uninitialized... (in this example, it seems to be always byte 108)

(the line numbers in the stacktrace may differ from yours as my editor did some reformatting)

Full console output: https://pastebin.com/hTtzwgDJ
tcp.rs for line number reference: https://pastebin.com/pJsw7ZQd

@dreraic
Copy link
Author

dreraic commented Aug 16, 2021

I meanwhile found out that using

timely = { version = "0.12.0", features=["bincode"] }

in Cargo.toml resolves both the uninitialized bytes error and the earlier mentioned segfault…

@frankmcsherry
Copy link
Member

I'd recommend using that if it works for you. It uses a more "safe" serialization mechanism. @antiguru took a brief peek (I don't want to speak for him) and saw the uninit bytes in padding, but we haven't gotten to reproducing the error. You mention having a crash; is that something you can share?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants