Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to remove quotes in final produced messages #5

Open
ghost opened this issue Oct 17, 2017 · 3 comments
Open

Option to remove quotes in final produced messages #5

ghost opened this issue Oct 17, 2017 · 3 comments

Comments

@ghost
Copy link

ghost commented Oct 17, 2017

First of all thanks for this awesome package. I find it really useful.

I was deploying a pipeline including nodered -> kafka -> telegraf -> influxdb The pipeline started generating some data using nodered red and sending it to kafka. In this scenario, telegraf consumes data from kafka and sends it to Influxdb. However, it did not work because messages sent to kafka are surrounded by quotes. I guess this is due to a stringify process inside the JS code. This results into a parsing error because parsers do not expect any quotes.

Maybe this could be an additional option, or simply the quotes could be removed.

@HannesP
Copy link

HannesP commented Dec 22, 2017

I've been struggling with this for a couple of hours, didn't know it was a known issue. Since Kafka by design doesn't require messages to be in a certain format, I think that it should be possible to feed any plain string to the Kafka producer node.

@HannesP
Copy link

HannesP commented Dec 22, 2017

This is the code in question. So, if the payload is an object, it's JSON.stringified twice. If it's a string, it's JSON.stringified once, which surrounds it in quotes (e.g. JSON.stringify('foo') === '"foo"'.) My comments on line 3 and line 21. Shouldn't the one on line 21 be removed? Or am I missing some use case here?

//set the value
if( typeof msg.payload === 'object') {
    value = JSON.stringify(msg.payload); // Both here...
} else {
    value = msg.payload.toString();
}

//set the timestamp
if( (new Date(msg.timestamp)).getTime() > 0 ) {
    timestamp = msg.timestamp;
} else if (msg.timestamp !== undefined) {
    console.log('[rdkafka] WARNING: Ignoring the following invalid timestamp on message:' + msg.timestamp);    
}

if (msg === null || topic === "") {
    util.log("[rdkafka] ignored request to send a NULL message or NULL topic");
} else {
    producer.produce(
      topic,                                // topic
      partition,                            // partition
      new Buffer(JSON.stringify(value)),    // value // ...and here
      key,                                  // key
      timestamp                             // timestamp
    );
}

@HannesP
Copy link

HannesP commented Dec 22, 2017

I see now that a PR for exactly this was created on Nov 18. I hope it'll be incorporated soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant