-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added the field localCorrelationData to MqttPublish for better flow-handling in reactive-APIs #546
base: master
Are you sure you want to change the base?
Conversation
…andling in reactive-APIs
Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @codepitbull on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement |
src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java
Outdated
Show resolved
Hide resolved
/** | ||
* @return the optional local correlation data of this Publish message. This data is never propagated and kept | ||
* locally for correlation. | ||
*/ | ||
Object localCorrelationData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API needs to be discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the usage of object (yep, could replace it with a generic but that would change general signature of the class) or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not like a generic here. But maybe there would be a case for creating a more sophisticated structure like for user properties. Having just plain Object there is not very expandable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YannickWeber any objections to merging this and then punting the improvement (that may or may not be needed) for a future date? Object
isn't "slick" but it serves the purpose for now without complexity. Plus this isn't a customer requested feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have objections. This is API, API can only change in major versions, therefore we need to be very cautious what we are adding and need to double check for expandability and clarity to make it future-proof.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to double check for expandability and clarity to make it future-proof
One addition, I agree on this point completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with moving cautiously, just to provide some context:
The newly added API is strictly for local correlation when inside a reactive pipeline.
I used Object because it was the least invasive and most flexible in that case.
With some fighting we might turn this into a generic.
We could also generate some id for these local interactions which I think would be a little much since this must never be transferred over the wire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course we can add API in minor versions, but we can only remove methods in major versions. Therefore, I want us to be very careful about API design, as we can't easily correct potential API issues.
In the client world, things aren't as strict since package updates are manual.
I would argue that API stability and caution is of high importance independent of where it is used.
My suggestion would be to not return an Object but rather add an Interface LocalCorrelationData
. So that we can expand the return value easier in the future. But that also does not mitigate the problem that you always have to unsafely cast when using the correlation data object :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An unsafe cast can be avoided with an instanceof, which got a lot nicer with recent Java-versions.
From a type-perspective it would be best to introduce a generic, like Ractor-Kafka is doing it (https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/sender/SenderRecord.java#L30).
Everything without a generic will require an unsafe cast at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A final decision would be ideal. Let me know if there are any other thoughts.
/** | ||
* @return the optional local correlation data of this Publish message. This data is never propagated and kept | ||
* locally for correlation. | ||
*/ | ||
Object getLocalCorrelationData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API needs to be discussed.
src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java
Outdated
Show resolved
Hide resolved
src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java
Outdated
Show resolved
Hide resolved
src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java
Outdated
Show resolved
Hide resolved
Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @codepitbull on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement |
Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @codepitbull on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement |
@cla-bot u so lazy |
@cla-bot check |
The cla-bot has been summoned, and re-checked this pull request! |
Hi all - let's wrap up this PR this week if possible so it doesn't go stale. |
Motivation
Using the reactive integrations (rxjava2/reactor) results in a situation where after an MqttPublish has been processed a Mqtt5PublishResult is forward in the stream. This object doesn't contain any processing context which makes a few usecases rather unintuitive to implement.
In my case I am receiving from Kafka and want to commit AFTER the message has been sent to HiveMQ. For that I need to get the commit-offset to the end of the stream.
Changes
To support the above mentioned usecase I added a localCorrelationData (similar to what Kafka is doing in their client-lib). This field is never propagated to HiveMQ but can be used down the streams. In my case I store the Kafka-Commit-Offset in there and use it to trigger the actual commit.