Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Copying messages for multicast #283

Closed
wants to merge 4 commits into from

Conversation

alanconway
Copy link
Contributor

@alanconway alanconway commented Jan 15, 2020

Copy messages for multicast

StructMessage and EventMessage are intended to be full implementations of the
Message interface that are independent of any transport and can be used to hold
a copy of a message in memory independently of the life of an incoming transport
message (e.g. for multicast, queuing etc.) CopyMessage copies an incoming
Message, the copy can be read many times and is independent of the original
message lifecycle (e.g. you might Finish() the original before you're done
multicasting the copy depending on policy)

They are a lot like your MockXMessages but they are not mocks, they're complete
Message implementations. Possibly there's some code overlap that can be cleaned up.

Please review the code carefully. I like what you've done to reduce copying but
there was a lot to take in, so this is a bit rushed.

One question: I intended for 2 types of message representation - Binary and
Structured. Message.Event() and EventMessage were really just implementations of
Binary encoding - not meant to be a third representation. I think you could drop
Message.Event() now that you have ToEvent(Message) so bindings only have to
implement the methods Structured() and Binary().

Signed-off-by: Alan Conway [email protected]

StructMessage and EventMessage are intended to be full implementations of the
Message interface that are independent of any transport and can be used to hold
a copy of a message in memory independently of the life of an incoming transport
message (e.g. for multicast, queuing etc.) CopyMessage copies an incoming
Message, the copy can be read many times and is independent of the original
message lifecycle (e.g. you might Finish() the original before you're done
multicasting the copy depending on policy)

They are a lot like your MockXMessages but they are not mocks, they're complete
Message implementations. Possibly there's some code overlap that can be cleaned up.

Please review the code carefully. I like what you've done to reduce copying but
there was a lot to take in, so this is a bit rushed.

One question: I intended for 2 types of message representation - Binary and
Structured. Message.Event() and EventMessage were really just implementations of
Binary encoding - not meant to be a third representation. I think you could drop
Message.Event() now that you have ToEvent(Message) so bindings only have to
implement the methods Structured() and Binary().

Signed-off-by: Alan Conway <[email protected]>
@alanconway
Copy link
Contributor Author

@slinkydeveloper for you - check it carefully it was rushed. Probably some overlap with you changes that can be cleaned up.

Copy link
Member

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your solution looks good for what regards the structured encoding. However IMO going through the EventMessage for binary messages is a step backward compared to #270, since all the optimization done for the message transferring in binary -> binary (using the BinaryEncoder) is neutralized in the "fanout" case. And we still have the problem of the encoders that accepts io.Reader as input, eventually re-buffering without any reason a read only buffer.

Brain dump: Maybe we can implement a solution similar to yours, some kind of union type that every Message implementation should contain to keep the whole event (in case of structured) or the event payload, that can be:

  • buffered or unbuffered
  • can be accessed or as a byte array, or as io.Reader. In the latter, message is buffered and that buffer has its own lifecycle and it's eventually reused

Then, Message interface could have an accessor method to retrieve it so we can create a method called BufferMessage that invokes it and transforms this union type from unbuffered to buffered.

if err != nil {
return nil, err
}
return WithFinish(em, func(err error) { _ = m.Finish(err) }), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binding the original message lifecycle to the copied message is probably what you don't want to do if you want to decouple the lifecycles of the two messages (eg like you explained in this comment point 2 #282 (comment))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my gut tells me that in cases where you need to buffer for one reason (mult-write) you will probably need to buffer for others (retry after incoming message is gone etc.) so there's' limited value in optimizing the special case of buffer-for-multi-write-only as it probably won't come up much in reality. I could be wrong but lets fix that when there's a real use case and the creation of a single extra EventMessage really is the hot spot on the critical path.

@@ -24,6 +24,14 @@ type Format interface {
Unmarshal([]byte, *ce.Event) error
}

// UnknownFormat allows an event with an unknown format string to be forwarded,
// but Marshal() and Unmarshal will always fail.
type UnknownFormat string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you added this new format? Any specific reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your changes pass a Format in places where we don't actually know if we can locally encode/decode it with a Format. Previously I had passed a string in such places, but I like the type hint of of passing a Format - so this is a special Format that allows you to pass a string through APIs that take a Format but fails if you actually try to encode or decode.

@alanconway
Copy link
Contributor Author

alanconway commented Jan 20, 2020

Your solution looks good for what regards the structured encoding. However IMO going through the EventMessage for binary messages is a step backward compared to #270, since all the optimization done for the message transferring in binary -> binary (using the BinaryEncoder) is neutralized in the "fanout" case.

There's no difference between structured and fanout: in both cases you incur the allocation cost of creating one cached object (EventMessage or StructMessage) using all binary optimizations available. After that you have to write N output messages regardless. If N==1 then avoiding the cache is faster, but for N>1 it may be faster to read the EventMessage than multi-read the original incoming message. Event if it isn't the overhead drops off as N gets bigger.

Brain dump: Maybe we can implement a solution similar to yours, some kind of union type that every Message implementation should contain to keep the whole event (in case of structured) or the event payload, that can be:

* buffered or unbuffered

* can be accessed or as a byte array, or as `io.Reader`. In the latter, message is buffered and that buffer has its own lifecycle and it's eventually reused

Then, Message interface could have an accessor method to retrieve it so we can create a method called BufferMessage that invokes it and transforms this union type from unbuffered to buffered.

At this point I'd trade simplicity for an optimization that might actually be a pessimization in some cases:

  • binding creates needless internal copy that is never used.
  • binding offers repeatable read method that is less efficient than caching as EventMesage and reading that.

I'd like to see the binding implement the simplest, fastest "read once" strategy and let the application decide if it needs caching for whatever reason (read many, lifecycle etc.)

I think this is straying into the realms of "premature optimization" - a more complex API can itself have negative performance impact because people who have to spend time learning/implementing a complex API have less time to think about performance.

@slinkydeveloper
Copy link
Member

but for N>1 it may be faster to read the EventMessage than multi-read the original incoming message.

So are you saying that having a single buffered message, which is read multiple times to dispatch the message to N senders, could be actually slower than copying the input message N times to create one message per sender?

@slinkydeveloper
Copy link
Member

binding creates needless internal copy that is never used

What copy?

binding offers repeatable read method that is less efficient than caching as EventMesage and reading that.

I get this one, it was one of my original concerns too with this design. Initially i thought that binding implementer should be responsible for implementing the retry mechanism after the message is read and translated to the "final" request structure (eg the HttpRequest). Perhaps now I should reconsider that choice...

I'd like to see the binding implement the simplest, fastest "read once" strategy and let the application decide if it needs caching for whatever reason (read many, lifecycle etc.)

I agree with this concern, perhaps i should reconsider some choice...

@alanconway
Copy link
Contributor Author

but for N>1 it may be faster to read the EventMessage than multi-read the original incoming message.

So are you saying that having a single buffered message, which is read multiple times to dispatch the message to N senders, could be actually slower than copying the input message N times to create one message per sender?

There are the folllowing operations involved:

  • RI - read from input Message
  • WE - write to EventMessage
  • RE - read from EventMessage
  • WO - write to output message

A) With no intermediate copy the cost is N*(RI+WO)
B) With an intermediate EventMessage copy the cost is: RI + WE + N*(RE+WO)

If N=1 then clearly A < B : (RI+WO) < RI + WE + RE + WO

However in the case N>1 it depends on the relative cost of RI and RE. If it is more expensive to read from the inbound message than from the trivial EventMessage (e.g. inbound read is parsing JSON or YAML, decoding AMQP or whatever) then at some value of N, B will become cheaper than A.

@alanconway
Copy link
Contributor Author

binding creates needless internal copy that is never used

What copy?

Suppose I have a JSON binding that does a JSON parse every time I read from the Message. I can do multiple reads, but there'll be a parse every time so for N>1 it will pretty quickly become more expensive than parsing once into an EventMessage and then reading from there N times.

Now you could demand that every binding support efficient read-many - but in that case the JSON binding would have to do the exact equivalent of creating an EventMessage internally on the first read, and worse - the JSONMessage can't know if the copy is needed or not, so it will make one even when we are actually going direct to the outbound message - defeating that direct no-copy optimization.

So I'd prefer the JSONMessage to be a simple JSON parser and let the application decide if can stream the message directly input-to-output or if it needs to make a copy for lifecycle reasons. The tricky case is when N>1 and the app doesn't need a copy for lifecycle reasons. Nobody knows for sure if it will be more efficient to make a temp copy or do N direct copies. At this point the optimization benefits are uncertain so I would opt for simplicity - just always do a copy, it's safer.

I get this one, it was one of my original concerns too with this design. Initially i thought that binding implementer should be responsible for implementing the retry mechanism after the message is read and translated to the "final" request structure (eg the HttpRequest). Perhaps now I should reconsider that choice...

If the fan-out is all to the same binding then that makes perfect sense and you should go direct to HttpMessage. However as we want to build more interesting event routers, bridges etc. then fan-out to multiple different kinds of bindings is sure to come up (e.g. mix of HTTP1 + HTTP2, different structured encodings, AMQP etc. etc.)

Using EventMessage is a strategy when you need a binding-neutral,, in-memory representation. That won't always be the case (you've pointed out some good examples) but it will definitely come up a lot.
The key is to keep it transparent so input Message and output Message are simple, and focused on the details of their encoding/protocol. That way you can pair an Message X and Y, where either, both or neither of X and Y are an EventMessage.

@slinkydeveloper
Copy link
Member

slinkydeveloper commented Jan 21, 2020

B) With an intermediate EventMessage copy the cost is: RI + WE + N*(RE+WO)

I agree with the idea that we should go through an intermediate buffered message representation, respectively one for structured and one for binary, and that this intermediate representation should be "readable" by different type of Senders at same time, although I'm worried that Event metadata marshalling/unmarshalling could bring us back to beginning (but this is a mere speculation, i need to test it to be sure). Even if that's the case, probably optimizing EventMessage could be the way to go more than creating a new intermediate representation (maybe with lazy metadata parsing?).

Anyway, the problem I see in this solution is that, using the actual visitors I've implemented, to read the StructuredMessage the "reader" doesn't receive directly the []byte, but it receives an io.Reader to the underlying []byte. So the ops cost is what you explained, while the actual copies ( so memory allocations) when the binding creates the final structure using directly a []byte (eg AMQPMessage) are 1 (the initial buffering) + N (for each send). And that's the reason why I've introduced this kind of "union type" here: https://github.com/cloudevents/sdk-go/pull/282/files#diff-4e71329bf72eef98f921c6239a5e8072R91 . So if the sender final structure accepts []byte, it just reuses the cached one more than going through an additional copy.
Hence my question, does it makes sense this additional copy?

@slinkydeveloper
Copy link
Member

Anyway I've opened a ticket to better investigate the various use cases, the actual one covers just one use case #285

@alanconway
Copy link
Contributor Author

alanconway commented Jan 21, 2020 via email

@slinkydeveloper
Copy link
Member

Ok so I've ran some tests of this branch (your code + minor nits + code to adapt the bench) vs my branch (+ code to adapt to bench)

Below part of the results (left is this branch, right is my branch):

Ns/ops with parallelism 4 and payload 1kb

Schermata da 2020-01-22 18-08-28

Ns/ops with parallelism 4 and payload 8kb

Schermata da 2020-01-22 18-11-11

Allocations with parallelism 4 and payload 1kb

Schermata da 2020-01-22 18-12-14

Allocations with parallelism 4 and payload 8kb

Schermata da 2020-01-22 18-06-26

Apart the binary to structured case that seems somehow broken (i'll further investigate on it to see what we can do about it), in multi sender case my branch constantly outperform this branch of 20/30% both on memory allocs that ns/ops. But my branch has pooling, so that's something we should consider among the others.

I'll try to move some pieces, like adding pooling, and get back with other results to see how we can get the two solutions closer.

@slinkydeveloper
Copy link
Member

slinkydeveloper commented Jan 27, 2020

@alanconway I've opened a new PR #292 where, starting from your work here, I add pooling and hide the CopyMessage result message implementations, allowing to efficiently implement it as we wish. Let me know WDYT

@n3wscott
Copy link
Member

close in favor of #292

@n3wscott n3wscott closed this Jan 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants