From 0fc0ff83fc5c58e01a09a053419f811d4460776e Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Thu, 3 Dec 2020 00:49:50 -0500 Subject: [PATCH] bindings: add crate and Go module with Service inter-op mechanism A Service looks like a bidirectional channel, but is one which is synchronously polled to send and receive framed message code and data payloads. It's intended as a fundamental building block for building bidirectional in-process streaming services that interoperate between Go and Rust -- an interface which is a heck of a lot faster than gRPC. It achieves zero-copy semantics by pushing Go []byte buffers down into CGO invocations, and surfacing Rust-owned []byte arenas and Frame payloads. There are some rules callers need to adhere to, such as not scribbling "sent" memory until Poll() is called, and not referencing Rust-owned memory returned by Poll() after _another_ Poll() call is made. It amortizes the CGO overhead by vectorizing dispatch. On the Rust side, some common functions are provided to monomorphize appropriate bindings for an implementation of a Service trait. Lies, damn lies, and benchmarks: This shows a variety of stride patterns (number of sent messages per Poll() call), some of which are deliberately worst case. It's comparing the actual "upper case" service with a no-op service that has the same setup & moving parts, but avoids the actual CGO invocation itself. Comparative benchmarks with equivalent "naive" CGO and "pure" Go Services are included. These are also zero-copy / return owned memory. $ go test ./go/bindings/ -bench='.' -benchtime 3s goos: linux goarch: amd64 pkg: github.com/estuary/flow/go/bindings BenchmarkUpperService/cgo-1-24 43781984 79.1 ns/op BenchmarkUpperService/noop-1-24 270172464 13.3 ns/op BenchmarkUpperService/cgo-3-24 48714016 72.2 ns/op BenchmarkUpperService/noop-3-24 368625391 9.70 ns/op BenchmarkUpperService/cgo-4-24 91550871 36.8 ns/op BenchmarkUpperService/noop-4-24 363729646 10.1 ns/op BenchmarkUpperService/cgo-11-24 77817141 45.2 ns/op BenchmarkUpperService/noop-11-24 369495175 9.76 ns/op BenchmarkUpperService/cgo-15-24 83527456 42.2 ns/op BenchmarkUpperService/noop-15-24 382336641 9.84 ns/op BenchmarkUpperService/cgo-17-24 135539218 27.0 ns/op BenchmarkUpperService/noop-17-24 423105061 8.66 ns/op BenchmarkUpperService/cgo-31-24 100000000 32.6 ns/op BenchmarkUpperService/noop-31-24 406740273 9.10 ns/op BenchmarkUpperService/cgo-32-24 151226794 23.7 ns/op BenchmarkUpperService/noop-32-24 423966495 8.67 ns/op BenchmarkUpperService/cgo-63-24 129356660 27.7 ns/op BenchmarkUpperService/noop-63-24 426944920 8.82 ns/op BenchmarkUpperService/cgo-137-24 150434529 23.9 ns/op BenchmarkUpperService/noop-137-24 441497529 8.39 ns/op BenchmarkUpperService/cgo-426-24 155104615 23.3 ns/op BenchmarkUpperService/noop-426-24 452368488 8.30 ns/op BenchmarkUpperServiceNaive-24 34605169 104 ns/op BenchmarkUpperServiceGo-24 278482276 12.8 ns/op PASS ok github.com/estuary/flow/go/bindings 111.130s --- Cargo.lock | 35 +++++ crates/bindings/Cargo.toml | 13 ++ crates/bindings/build.rs | 18 +++ crates/bindings/cbindgen.toml | 1 + crates/bindings/flow_bindings.h | 97 ++++++++++++ crates/bindings/src/lib.rs | 2 + crates/bindings/src/service.rs | 233 +++++++++++++++++++++++++++++ crates/bindings/src/upper_case.rs | 109 ++++++++++++++ go/bindings/service.go | 240 ++++++++++++++++++++++++++++++ go/bindings/service_test.go | 200 +++++++++++++++++++++++++ go/bindings/upper_case.go | 118 +++++++++++++++ 11 files changed, 1066 insertions(+) create mode 100644 crates/bindings/Cargo.toml create mode 100644 crates/bindings/build.rs create mode 100644 crates/bindings/cbindgen.toml create mode 100644 crates/bindings/flow_bindings.h create mode 100644 crates/bindings/src/lib.rs create mode 100644 crates/bindings/src/service.rs create mode 100644 crates/bindings/src/upper_case.rs create mode 100644 go/bindings/service.go create mode 100644 go/bindings/service_test.go create mode 100644 go/bindings/upper_case.go diff --git a/Cargo.lock b/Cargo.lock index be7370c731..cc658ec9c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,6 +133,13 @@ dependencies = [ "which", ] +[[package]] +name = "bindings" +version = "0.0.0" +dependencies = [ + "cbindgen", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -253,6 +260,25 @@ dependencies = [ "yaml-merge-keys", ] +[[package]] +name = "cbindgen" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df6a11bba1d7cab86c166cecf4cf8acd7d02b7b65924d81b33d27197f22ee35" +dependencies = [ + "clap", + "heck", + "indexmap", + "log", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn", + "tempfile", + "toml", +] + [[package]] name = "cc" version = "1.0.62" @@ -2882,6 +2908,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75cf45bb0bef80604d001caaec0d09da99611b3c0fd39d3080468875cdb65645" +dependencies = [ + "serde", +] + [[package]] name = "tonic" version = "0.3.1" diff --git a/crates/bindings/Cargo.toml b/crates/bindings/Cargo.toml new file mode 100644 index 0000000000..4939b397b9 --- /dev/null +++ b/crates/bindings/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "bindings" +version = "0.0.0" +authors = ["Estuary Technologies, Inc"] +edition = "2018" + +[lib] +crate_type = ["staticlib"] + +[dependencies] + +[build-dependencies] +cbindgen = "*" diff --git a/crates/bindings/build.rs b/crates/bindings/build.rs new file mode 100644 index 0000000000..72d93375d2 --- /dev/null +++ b/crates/bindings/build.rs @@ -0,0 +1,18 @@ +extern crate cbindgen; + +use std::env; +use std::path::PathBuf; + +fn main() { + let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); + + let config = cbindgen::Config::from_file(PathBuf::from(&crate_dir).join("cbindgen.toml")) + .expect("failed to parse cbindgen config"); + + cbindgen::Builder::new() + .with_crate(crate_dir) + .with_config(config) + .generate() + .expect("Unable to generate bindings") + .write_to_file("flow_bindings.h"); +} diff --git a/crates/bindings/cbindgen.toml b/crates/bindings/cbindgen.toml new file mode 100644 index 0000000000..08094f28fc --- /dev/null +++ b/crates/bindings/cbindgen.toml @@ -0,0 +1 @@ +language = "C" diff --git a/crates/bindings/flow_bindings.h b/crates/bindings/flow_bindings.h new file mode 100644 index 0000000000..d23f2a3f43 --- /dev/null +++ b/crates/bindings/flow_bindings.h @@ -0,0 +1,97 @@ +#include +#include +#include +#include + +/** + * Opaque pointer for a Service instance in the ABI. + */ +typedef struct { + uint8_t _private[0]; +} ServiceImpl; + +/** + * Output frame produced by a Service. + */ +typedef struct { + /** + * Service-defined response code. + */ + uint32_t code; + /** + * Begin data offset into the Channel arena. + */ + uint32_t begin; + /** + * End data offset into the Channel arena. + */ + uint32_t end; +} Out; + +/** + * Channel is shared between CGO and Rust, and holds details + * about the language interconnect. + */ +typedef struct { + ServiceImpl *svc_impl; + uint8_t *arena_ptr; + uintptr_t arena_len; + uintptr_t arena_cap; + Out *out_ptr; + uintptr_t out_len; + uintptr_t out_cap; + uint8_t *err_ptr; + uintptr_t err_len; + uintptr_t err_cap; +} Channel; + +/** + * Input frame produced from CGO, which is a single service invocation. + * 16 bytes, or 1/4 of a typical cache line. + */ +typedef struct { + const uint8_t *data_ptr; + uint32_t data_len; + uint32_t code; +} In1; + +/** + * Four invocations, composed into one struct. + * 64 bytes, or one typical cache line. + */ +typedef struct { + In1 in0; + In1 in1; + In1 in2; + In1 in3; +} In4; + +/** + * Sixteen invocations, composed into one struct. + * 256 bytes, or four typical cache lines. + */ +typedef struct { + In4 in0; + In4 in1; + In4 in2; + In4 in3; +} In16; + +Channel *upper_case_create(void); + +void upper_case_invoke1(Channel *ch, In1 i); + +void upper_case_invoke4(Channel *ch, In4 i); + +void upper_case_invoke16(Channel *ch, In16 i); + +void upper_case_drop(Channel *ch); + +ServiceImpl *create_upper_case_naive(void); + +uint32_t upper_case_naive(ServiceImpl *svc, + uint32_t _code, + const uint8_t *in_ptr, + uint32_t in_len, + const uint8_t **out_ptr, + uint32_t *out_len); diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs new file mode 100644 index 0000000000..8c73ca689d --- /dev/null +++ b/crates/bindings/src/lib.rs @@ -0,0 +1,2 @@ +pub mod service; +mod upper_case; diff --git a/crates/bindings/src/service.rs b/crates/bindings/src/service.rs new file mode 100644 index 0000000000..97209f7065 --- /dev/null +++ b/crates/bindings/src/service.rs @@ -0,0 +1,233 @@ +/// Service is a trait implemented by services which may be called from Go. +pub trait Service { + /// Error type returned by Service invocations. + type Error: std::error::Error; + /// Create a new instance of the Service. + fn create() -> Self; + /// Invoke with the given op-code & |data| payload. + /// It extends |arena| with any returned []byte data, and pushes output messages onto |out|. + fn invoke( + &mut self, + code: u32, + data: &[u8], + arena: &mut Vec, + out: &mut Vec, + ) -> Result<(), Self::Error>; +} + +/// Output frame produced by a Service. +#[repr(C)] +pub struct Out { + /// Service-defined response code. + pub code: u32, + /// Begin data offset into the Channel arena. + pub begin: u32, + /// End data offset into the Channel arena. + pub end: u32, +} + +/// InN is a variadic input which invokes itself against a Service. +pub trait InN { + fn invoke( + self: &Self, + svc: &mut S, + arena: &mut Vec, + out: &mut Vec, + ) -> Result<(), S::Error>; +} + +/// Input frame produced from CGO, which is a single service invocation. +/// 16 bytes, or 1/4 of a typical cache line. +#[repr(C)] +pub struct In1 { + data_ptr: *const u8, + data_len: u32, + code: u32, +} + +impl InN for In1 { + #[inline] + fn invoke( + self: &Self, + svc: &mut S, + arena: &mut Vec, + out: &mut Vec, + ) -> Result<(), S::Error> { + svc.invoke( + self.code, + unsafe { std::slice::from_raw_parts(self.data_ptr, self.data_len as usize) }, + arena, + out, + ) + } +} + +/// Four invocations, composed into one struct. +/// 64 bytes, or one typical cache line. +#[repr(C)] +pub struct In4 { + in0: In1, + in1: In1, + in2: In1, + in3: In1, +} + +impl InN for In4 { + #[inline] + fn invoke( + self: &Self, + svc: &mut S, + arena: &mut Vec, + out: &mut Vec, + ) -> Result<(), S::Error> { + self.in0.invoke(svc, arena, out)?; + self.in1.invoke(svc, arena, out)?; + self.in2.invoke(svc, arena, out)?; + self.in3.invoke(svc, arena, out) + } +} + +/// Sixteen invocations, composed into one struct. +/// 256 bytes, or four typical cache lines. +#[repr(C)] +pub struct In16 { + in0: In4, + in1: In4, + in2: In4, + in3: In4, +} + +impl InN for In16 { + #[inline] + fn invoke( + self: &Self, + svc: &mut S, + arena: &mut Vec, + out: &mut Vec, + ) -> Result<(), S::Error> { + self.in0.invoke(svc, arena, out)?; + self.in1.invoke(svc, arena, out)?; + self.in2.invoke(svc, arena, out)?; + self.in3.invoke(svc, arena, out) + } +} + +/// Opaque pointer for a Service instance in the ABI. +#[repr(C)] +pub struct ServiceImpl { + _private: [u8; 0], +} + +/// Channel is shared between CGO and Rust, and holds details +/// about the language interconnect. +#[repr(C)] +pub struct Channel { + // Opaque service pointer. + svc_impl: *mut ServiceImpl, + + // Output memory arena, exposed to CGO. + arena_ptr: *mut u8, + arena_len: usize, + arena_cap: usize, + + // Output frame codes & arena offsets, exposed to CGO. + out_ptr: *mut Out, + out_len: usize, + out_cap: usize, + + // Final error returned by the Service. + err_ptr: *mut u8, + err_len: usize, + err_cap: usize, +} + +/// Create a new Service instance, wrapped in an owning Channel. +/// This is intended to be monomorphized by each Service implementation, +/// and exposed via cbindgen. See the UpperCase service for an example. +#[inline] +pub fn create() -> *mut Channel { + let svc_impl = Box::new(S::create()); + let svc_impl = Box::leak(svc_impl) as *mut S as *mut ServiceImpl; + + let ch = Box::new(Channel { + svc_impl, + arena_ptr: 0 as *mut u8, + arena_len: 0, + arena_cap: 0, + out_ptr: 0 as *mut Out, + out_len: 0, + out_cap: 0, + err_ptr: 0 as *mut u8, + err_len: 0, + err_cap: 0, + }); + Box::leak(ch) +} + +/// Invoke a Service with one input. +/// This is intended to be monomorphized by each Service implementation, +/// and exposed via cbindgen. See the UpperCase service for an example. +#[inline] +pub fn invoke(ch: *mut Channel, i: I) { + let ch = unsafe { &mut *ch }; + + if ch.err_cap != 0 { + return; // If an error has been set, further invocations are no-ops. + } + + let mut arena = unsafe { Vec::::from_raw_parts(ch.arena_ptr, ch.arena_len, ch.arena_cap) }; + let mut out = unsafe { Vec::::from_raw_parts(ch.out_ptr, ch.out_len, ch.out_cap) }; + let mut err_str = unsafe { String::from_raw_parts(ch.err_ptr, ch.err_len, ch.err_cap) }; + let svc_impl = unsafe { &mut *(ch.svc_impl as *mut S) }; + + let r = i.invoke(svc_impl, &mut arena, &mut out); + if let Err(err) = r { + // Set terminal error string. + err_str = format!("{:?}", err); + } + + ch.arena_ptr = arena.as_mut_ptr(); + ch.arena_cap = arena.capacity(); + ch.arena_len = arena.len(); + std::mem::forget(arena); + + ch.out_ptr = out.as_mut_ptr(); + ch.out_cap = out.capacity(); + ch.out_len = out.len(); + std::mem::forget(out); + + ch.err_ptr = err_str.as_mut_ptr(); + ch.err_cap = err_str.capacity(); + ch.err_len = err_str.len(); + std::mem::forget(err_str); +} + +/// Drop a Service and its Channel. +/// This is intended to be monomorphized by each Service implementation, +/// and exposed via cbindgen. See the UpperCase service for an example. +#[inline] +pub fn drop(ch: *mut Channel) { + let Channel { + // Opaque service pointer. + svc_impl, + + // Output frame codes & arena offsets, exposed to CGO. + arena_ptr, + arena_len, + arena_cap, + + out_ptr, + out_len, + out_cap, + + err_ptr, + err_len, + err_cap, + } = *unsafe { Box::from_raw(ch) }; + + // Drop svc_impl, arena, and out. + unsafe { Box::from_raw(svc_impl as *mut S) }; + unsafe { Vec::::from_raw_parts(arena_ptr, arena_len, arena_cap) }; + unsafe { Vec::::from_raw_parts(out_ptr, out_len, out_cap) }; + unsafe { String::from_raw_parts(err_ptr, err_len, err_cap) }; +} diff --git a/crates/bindings/src/upper_case.rs b/crates/bindings/src/upper_case.rs new file mode 100644 index 0000000000..1190c0d7ed --- /dev/null +++ b/crates/bindings/src/upper_case.rs @@ -0,0 +1,109 @@ +use crate::service::{self, Channel, Service, ServiceImpl}; +use std::io::Write; + +/// UpperCase is an example Service that returns its inputs as upper-case +/// ASCII, along with a running sum of the number of bytes upper-cased +/// (returned with each response code). +pub struct UpperCase { + sum_length: u32, +} + +impl Service for UpperCase { + type Error = std::io::Error; + + fn create() -> Self { + Self { sum_length: 0 } + } + + fn invoke( + &mut self, + _code: u32, + data: &[u8], + arena: &mut Vec, + out: &mut Vec, + ) -> Result<(), Self::Error> { + if data == b"whoops!" { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "whoops!")); + } + + let begin = arena.len() as u32; + arena.extend(data.iter().map(u8::to_ascii_uppercase)); + self.sum_length += data.len() as u32; + + out.push(service::Out { + code: self.sum_length, + begin, + end: arena.len() as u32, + }); + + Ok(()) + } +} + +// Define cbindgen <=> CGO hooks for driving the UpperCase service. + +#[no_mangle] +pub extern "C" fn upper_case_create() -> *mut Channel { + service::create::() +} +#[no_mangle] +pub extern "C" fn upper_case_invoke1(ch: *mut Channel, i: service::In1) { + service::invoke::(ch, i) +} +#[no_mangle] +pub extern "C" fn upper_case_invoke4(ch: *mut Channel, i: service::In4) { + service::invoke::(ch, i) +} +#[no_mangle] +pub extern "C" fn upper_case_invoke16(ch: *mut Channel, i: service::In16) { + service::invoke::(ch, i) +} +#[no_mangle] +pub extern "C" fn upper_case_drop(ch: *mut Channel) { + service::drop::(ch) +} + +/// UpperCaseNaive is not part of UpperCase's service interface. +/// It's here for comparative benchmarking with a more traditional CGO call style. + +struct UpperCaseNaive { + sum_length: u32, + arena: Vec, +} + +#[no_mangle] +pub extern "C" fn create_upper_case_naive() -> *mut ServiceImpl { + Box::leak(Box::new(UpperCaseNaive { + sum_length: 0, + arena: Vec::new(), + })) as *mut UpperCaseNaive as *mut ServiceImpl +} + +#[no_mangle] +pub extern "C" fn upper_case_naive( + svc: *mut ServiceImpl, + _code: u32, + in_ptr: *const u8, + in_len: u32, + out_ptr: &mut *const u8, + out_len: &mut u32, +) -> u32 { + let svc = unsafe { &mut *(svc as *mut UpperCaseNaive) }; + let in_ = unsafe { std::slice::from_raw_parts(in_ptr, in_len as usize) }; + + svc.arena.clear(); + + let code = if in_ == b"whoops!" { + let err = std::io::Error::new(std::io::ErrorKind::Other, "whoops!"); + write!(svc.arena, "{:?}", err).unwrap(); + std::u32::MAX + } else { + svc.arena.extend(in_.iter().map(u8::to_ascii_uppercase)); + svc.sum_length += in_.len() as u32; + svc.sum_length + }; + + *out_ptr = svc.arena.as_ptr(); + *out_len = svc.arena.len() as u32; + code +} diff --git a/go/bindings/service.go b/go/bindings/service.go new file mode 100644 index 0000000000..20cb05b30a --- /dev/null +++ b/go/bindings/service.go @@ -0,0 +1,240 @@ +package bindings + +/* +#cgo LDFLAGS: -L${SRCDIR}/../../target/release -lbindings -ldl +#include "../../crates/bindings/flow_bindings.h" +*/ +import "C" + +import ( + "errors" + "fmt" + "reflect" + "runtime" + "unsafe" +) + +// Service is a Go handle to an instantiated service binding. +type Service struct { + ch *C.Channel + frameIn []C.In1 + frameOut []Frame + frameBuf []byte + + invoke1 func(*C.Channel, C.In1) + invoke4 func(*C.Channel, C.In4) + invoke16 func(*C.Channel, C.In16) +} + +// Build a new Service instance. This is to be wrapped by concrete, exported +// Service constructors of this package -- constructors which also handle +// bootstrap and configuration of the Service, map to returned errors, and may +// provide friendlier interfaces than those of Service. +func newService( + create func() *C.Channel, + invoke1 func(*C.Channel, C.In1), + invoke4 func(*C.Channel, C.In4), + invoke16 func(*C.Channel, C.In16), + drop func(*C.Channel), +) *Service { + var ch = create() + + var svc = &Service{ + ch: ch, + frameIn: make([]C.In1, 0, 16), + frameOut: make([]Frame, 0, 16), + frameBuf: make([]byte, 0, 256), + invoke1: invoke1, + invoke4: invoke4, + invoke16: invoke16, + } + runtime.SetFinalizer(svc, func(svc *Service) { + drop(svc.ch) + }) + + return svc +} + +// Frame is a payload which may be passed to and from a Service. +type Frame struct { + // User-defined Code of the Frame. + Code uint32 + // Data payload of the frame. + Data []byte +} + +// Frameable is the interface provided by messages that know how to frame +// themselves, notably Protobuf messages. +type Frameable interface { + ProtoSize() int + MarshalToSizedBuffer(dAtA []byte) (int, error) +} + +// SendBytes to the Service. +// The sent |data| must not be changed until the next Service Poll(). +func (s *Service) SendBytes(code uint32, data []byte) { + var h = (*reflect.SliceHeader)(unsafe.Pointer(&data)) + + s.frameIn = append(s.frameIn, C.In1{ + code: C.uint32_t(code), + data_len: C.uint32_t(h.Len), + data_ptr: (*C.uint8_t)(unsafe.Pointer(h.Data)), + }) +} + +// SendMessage sends the serialization of a Frameable message to the Service. +func (s *Service) SendMessage(code uint32, m Frameable) error { + var n, err = m.MarshalToSizedBuffer(s.ReserveBytes(code, m.ProtoSize())) + if err != nil { + return err + } else if n != 0 { + return fmt.Errorf("MarshalToSizedBuffer left unexpected remainder: %d", n) + } + return nil +} + +// ReserveBytes reserves a length-sized []byte slice which will be +// sent with the next Service Poll(). Until then, the caller may +// write into the returned bytes, e.x. in order to serialize a +// message of prior known size. +func (s *Service) ReserveBytes(code uint32, length int) []byte { + var l = len(s.frameBuf) + var c = cap(s.frameBuf) + + if c-l < length { + // Grow frameBuf, but don't bother to copy (prior buffers are + // still pinned by their current Frames). + for c < length { + c = c << 1 + } + s.frameBuf, l = make([]byte, 0, c), 0 + } + + var next = s.frameBuf[0 : l+length] + s.SendBytes(code, next[l:]) + s.frameBuf = next + + return next[l:] +} + +// Poll the Service. On return, all frames sent since the last Poll have been +// processed, and any response Frames are returned. Poll also returns a memory +// arena which individual Frames may reference (e.x., by encoding offsets into +// the returned arena). +// NOTE: The []byte arena and returned Frame Data is owned by the Service, not Go, +// and is *ONLY* valid until the next call to Poll(). At that point, it may be +// over-written or freed, and attempts to access it may crash the program. +func (s *Service) Poll() ([]byte, []Frame, error) { + // Reset output storage cursors. + // SAFETY: the channel arena and output frames hold only integer types + // (u8 bytes and u32 offsets, respectively), having trivial impl Drops. + s.ch.arena_len = 0 + s.ch.out_len = 0 + + var input = s.frameIn + + // Invoke in strides of 16. + // The compiler is smart enough to omit bounds checks here. + for len(input) >= 16 { + s.invoke16(s.ch, C.In16{ + in0: C.In4{ + in0: input[0], + in1: input[1], + in2: input[2], + in3: input[3], + }, + in1: C.In4{ + in0: input[4], + in1: input[5], + in2: input[6], + in3: input[7], + }, + in2: C.In4{ + in0: input[8], + in1: input[9], + in2: input[10], + in3: input[11], + }, + in3: C.In4{ + in0: input[12], + in1: input[13], + in2: input[14], + in3: input[15], + }, + }) + input = input[16:] + } + // Invoke in strides of 4. + for len(input) >= 4 { + s.invoke4(s.ch, C.In4{ + in0: input[0], + in1: input[1], + in2: input[2], + in3: input[3], + }) + input = input[4:] + } + // Invoke in strides of 1. + for _, in := range input { + s.invoke1(s.ch, in) + } + // All inputs are consumed. Reset. + s.frameIn = s.frameIn[:0] + s.frameBuf = s.frameBuf[:0] + + // During invocations, ch.arena_*, ch.out_*, and ch.err_* slices were updated. + // Obtain zero-copy access to each of them. + var arena []byte + var chOut []C.Out + var chErr []byte + + var arenaHeader = (*reflect.SliceHeader)(unsafe.Pointer(&arena)) + var chOutHeader = (*reflect.SliceHeader)(unsafe.Pointer(&chOut)) + var chErrHeader = (*reflect.SliceHeader)(unsafe.Pointer(&chErr)) + + arenaHeader.Cap = int(s.ch.arena_cap) + arenaHeader.Len = int(s.ch.arena_len) + arenaHeader.Data = uintptr(unsafe.Pointer(s.ch.arena_ptr)) + + chOutHeader.Cap = int(s.ch.out_cap) + chOutHeader.Len = int(s.ch.out_len) + chOutHeader.Data = uintptr(unsafe.Pointer(s.ch.out_ptr)) + + chErrHeader.Cap = int(s.ch.err_cap) + chErrHeader.Len = int(s.ch.err_len) + chErrHeader.Data = uintptr(unsafe.Pointer(s.ch.err_ptr)) + + // We must copy raw C.Out instances to our Go-side |frameOut|. + + // First grow it, if required. + if c := cap(s.frameOut); c < len(chOut) { + for c < len(chOut) { + c = c << 1 + } + s.frameOut = make([]Frame, len(chOut), c) + } else { + s.frameOut = s.frameOut[:len(chOut)] + } + + for i, o := range chOut { + // This avoids the bounds check into |arena| which would otherwise be done, + // if Go slicing were used. Equivalent to `arena[o.begin:o.end]`. + var data []byte + var dataHeader = (*reflect.SliceHeader)(unsafe.Pointer(&data)) + dataHeader.Cap = int(o.end - o.begin) + dataHeader.Len = int(o.end - o.begin) + dataHeader.Data = uintptr(unsafe.Pointer(s.ch.arena_ptr)) + uintptr(o.begin) + + s.frameOut[i] = Frame{ + Code: uint32(o.code), + Data: data, + } + } + + var err error + if len(chErr) != 0 { + err = errors.New(string(chErr)) + } + + return arena, s.frameOut, err +} diff --git a/go/bindings/service_test.go b/go/bindings/service_test.go new file mode 100644 index 0000000000..c138ef0200 --- /dev/null +++ b/go/bindings/service_test.go @@ -0,0 +1,200 @@ +package bindings + +import ( + "bytes" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +// frameableString implements the Frameable interface. +type frameableString string + +func (m frameableString) ProtoSize() int { return len(m) } +func (m frameableString) MarshalToSizedBuffer(b []byte) (int, error) { + copy(b, m) + return 0, nil +} + +func TestUpperServiceFunctional(t *testing.T) { + var svc = newUpperCase() + + // Cover frameBuf growing. + svc.frameBuf = make([]byte, 0, 1) + + svc.SendBytes(1, []byte("hello")) + svc.SendMessage(2, frameableString("world")) + var arena, responses, err = svc.Poll() + + assert.Equal(t, []byte("HELLOWORLD"), arena) + assert.Equal(t, []Frame{ + {Data: []byte("HELLO"), Code: 5}, + {Data: []byte("WORLD"), Code: 10}, + }, responses) + assert.NoError(t, err) + + svc.SendMessage(3, frameableString("bye")) + arena, responses, err = svc.Poll() + + assert.Equal(t, []byte("BYE"), arena) + assert.Equal(t, []Frame{ + {Data: []byte("BYE"), Code: 13}, + }, responses) + assert.NoError(t, err) + + // Trigger an error, and expect it's plumbed through. + svc.SendBytes(6, []byte("whoops!")) + _, _, err = svc.Poll() + assert.EqualError(t, err, "Custom { kind: Other, error: \"whoops!\" }") +} + +func TestNoOpServiceFunctional(t *testing.T) { + var svc = newNoOpService() + + svc.SendBytes(1, []byte("hello")) + svc.SendBytes(2, []byte("world")) + + var arena, responses, err = svc.Poll() + assert.Empty(t, arena) + assert.Equal(t, []Frame{{}, {}}, responses) + assert.NoError(t, err) + + svc.SendBytes(3, []byte("bye")) + + arena, responses, err = svc.Poll() + assert.Empty(t, arena) + assert.Equal(t, []Frame{{}}, responses) + assert.NoError(t, err) +} + +func TestUpperServiceWithStrides(t *testing.T) { + var svc = newUpperCase() + + for i := 0; i != 4; i++ { + var given = []byte("abcd0123efghijklm456nopqrstuvwxyz789") + var expect = bytes.Repeat([]byte("ABCD0123EFGHIJKLM456NOPQRSTUVWXYZ789"), 2) + + svc.SendBytes(1, nil) + for b := 0; b != len(given); b += 2 { + svc.SendBytes(2, given[b:b+2]) + } + svc.SendBytes(3, nil) + + for b := 0; b != len(given); b += 1 { + svc.SendBytes(4, given[b:b+1]) + } + svc.SendBytes(5, nil) + + var got []byte + var _, responses, err = svc.Poll() + assert.NoError(t, err) + + for _, r := range responses { + got = append(got, r.Data...) + } + assert.Equal(t, expect, got) + assert.Equal(t, len(given)*2*(i+1), int(responses[len(responses)-1].Code)) + } +} + +func TestUpperServiceNaive(t *testing.T) { + var svc = newUpperCaseNaive() + + var code, data, err = svc.invoke(123, []byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, int(code)) + assert.Equal(t, data, []byte("HELLO")) + + var given = []byte("abcd0123efghijklm456nopqrstuvwxyz789") + var expect = []byte("ABCD0123EFGHIJKLM456NOPQRSTUVWXYZ789") + + code, data, err = svc.invoke(456, given) + assert.NoError(t, err) + assert.Equal(t, 5+len(given), int(code)) + assert.Equal(t, expect, data) + + _, _, err = svc.invoke(789, []byte("whoops!")) + assert.EqualError(t, err, "Custom { kind: Other, error: \"whoops!\" }") +} + +func TestUpperServiceGo(t *testing.T) { + var svc = newUpperCaseGo() + + var code, data, err = svc.invoke(123, []byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, int(code)) + assert.Equal(t, data, []byte("HELLO")) + + var given = []byte("abcd0123efghijklm456nopqrstuvwxyz789") + var expect = []byte("ABCD0123EFGHIJKLM456NOPQRSTUVWXYZ789") + + code, data, err = svc.invoke(456, given) + assert.NoError(t, err) + assert.Equal(t, 5+len(given), int(code)) + assert.Equal(t, expect, data) + + _, _, err = svc.invoke(789, []byte("whoops!")) + assert.EqualError(t, err, "whoops!") +} + +func BenchmarkUpperService(b *testing.B) { + var strides = []int{ + 1, // Worst case. + 3, // Almost worst case: 3 separate invocations. + 4, // Single 4-stride invocation. + 11, // 4 + 4 + 1 + 1 + 1 + 15, // 4 + 4 + 4 + 1 + 1 + 1 + 17, // 16 + 1 + 31, // 16 + 4 + 4 + 4 + 1 + 1 + 1 + 32, // 16 + 16 + 63, // 16 + 16 + 16 + 4 + 4 + 4 + 1 + 1 + 1 + 137, + 426, + } + var input = []byte("hello world") + + for _, stride := range strides { + b.Run("cgo-"+strconv.Itoa(stride), func(b *testing.B) { + var svc = newUpperCase() + + for i := 0; i != b.N; i++ { + if i%stride == 0 && i > 0 { + svc.Poll() + } + svc.SendBytes(0, input) + } + var _, _, _ = svc.Poll() + }) + + b.Run("noop-"+strconv.Itoa(stride), func(b *testing.B) { + var svc = newNoOpService() + + for i := 0; i != b.N; i++ { + if i%stride == 0 && i > 0 { + svc.Poll() + } + svc.SendBytes(0, input) + } + var _, _, _ = svc.Poll() + }) + } +} + +func BenchmarkUpperServiceNaive(b *testing.B) { + var svc = newUpperCaseNaive() + var input = []byte("hello world") + + for i := 0; i != b.N; i++ { + _, _, _ = svc.invoke(123, input) + } +} + +func BenchmarkUpperServiceGo(b *testing.B) { + var svc = newUpperCaseGo() + var input = []byte("hello world") + + for i := 0; i != b.N; i++ { + _, _, _ = svc.invoke(123, input) + } +} diff --git a/go/bindings/upper_case.go b/go/bindings/upper_case.go new file mode 100644 index 0000000000..4466dd483d --- /dev/null +++ b/go/bindings/upper_case.go @@ -0,0 +1,118 @@ +package bindings + +// #include "../../crates/bindings/flow_bindings.h" +import "C" +import ( + "bytes" + "errors" + "math" + "reflect" + "unsafe" +) + +// newUpperCase is a testing Service that upper-cases each input Frame, +// and returns the running sum length of its inputs via its response +// Frame Code. +func newUpperCase() *Service { + return newService( + func() *C.Channel { return C.upper_case_create() }, + func(ch *C.Channel, in C.In1) { C.upper_case_invoke1(ch, in) }, + func(ch *C.Channel, in C.In4) { C.upper_case_invoke4(ch, in) }, + func(ch *C.Channel, in C.In16) { C.upper_case_invoke16(ch, in) }, + func(ch *C.Channel) { C.upper_case_drop(ch) }, + ) +} + +// newNoOpService is a testing Service that doesn't invoke into CGO, +// but still produces an (empty) output Frame for each input. +func newNoOpService() *Service { + return newService( + func() *C.Channel { + var ch = (*C.Channel)(C.calloc(C.sizeof_Channel, 1)) + ch.out_ptr = (*C.Out)(C.calloc(C.sizeof_Out, 512)) + + ch.out_cap = 512 + ch.out_len = 0 + + return ch + }, + // Increment output cursors, so that we build Frames for each input, + // but don't actually invoke CGO. + func(ch *C.Channel, in C.In1) { ch.out_len += 1 }, + func(ch *C.Channel, in C.In4) { ch.out_len += 4 }, + func(ch *C.Channel, in C.In16) { ch.out_len += 16 }, + func(ch *C.Channel) { + C.free(unsafe.Pointer(ch.out_ptr)) + C.free(unsafe.Pointer(ch)) + }, + ) +} + +// upperCaseNaive is an alternative, non-Service implementation which +// does the same task using a more typical CGO invocation pattern. +// It too avoids copies and returned Rust-owned memory after each call. +// It's here for comparative benchmarking. +type upperCaseNaive struct { + svc *C.ServiceImpl +} + +func newUpperCaseNaive() upperCaseNaive { + return upperCaseNaive{ + svc: C.create_upper_case_naive(), + } +} + +func (s *upperCaseNaive) invoke(codeIn uint32, input []byte) (uint32, []byte, error) { + var h = (*reflect.SliceHeader)(unsafe.Pointer(&input)) + var out_len C.uint32_t + var out_ptr *C.uint8_t + + var codeOut = C.upper_case_naive( + s.svc, + C.uint32_t(codeIn), + (*C.uint8_t)(unsafe.Pointer(h.Data)), + C.uint32_t(h.Len), + &out_ptr, + &out_len) + + var output []byte + h = (*reflect.SliceHeader)(unsafe.Pointer(&output)) + h.Cap = int(out_len) + h.Len = int(out_len) + h.Data = uintptr(unsafe.Pointer(out_ptr)) + + var err error + if codeOut == math.MaxUint32 { + err = errors.New(string(output)) + } + return uint32(codeOut), output, err +} + +// upperCaseGo is yet another pure-Go implementation, for comparative benchmarking. +type upperCaseGo struct { + invoke func(uint32, []byte) (uint32, []byte, error) +} + +func newUpperCaseGo() upperCaseGo { + var sumLength uint32 + var arena []byte + + // Use a closure to force dynamic dispatch / prevent inlining. + var fn = func(codeIn uint32, input []byte) (uint32, []byte, error) { + if bytes.Equal(input, []byte("whoops!")) { + return 0, nil, errors.New("whoops!") + } + + arena = append(arena[:0], input...) + sumLength += uint32(len(input)) + + for i, b := range arena { + if b >= 'a' && b <= 'z' { + arena[i] = b - 'a' + 'A' + } + } + return sumLength, arena, nil + } + + return upperCaseGo{invoke: fn} +}