Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check topic existence on Producer '_produceWrapper()` #24

Closed
mvanbrummen opened this issue Dec 20, 2017 · 5 comments
Closed

Check topic existence on Producer '_produceWrapper()` #24

mvanbrummen opened this issue Dec 20, 2017 · 5 comments
Assignees

Comments

@mvanbrummen
Copy link

mvanbrummen commented Dec 20, 2017

Hi,

I am trying out a fairly simple example. I have the following topic and avro key and value schemas created:

test.topic

{
subject: "test.topic-key",
version: 1,
id: 2,
schema: ""string""
}

{
subject: "test.topic-value",
version: 1,
id: 1,
schema: "{"type":"record","name":"User","namespace":"test.avro","fields":[{"name":"name","type":"string"}]}"
}

When I attempt to produce a subsequent message with kafka-avro I get the following error:

Ready to use A problem occurred when sending our message Error: invalid "string": undefined at throwInvalidError (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:2688:9) at StringType._write (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:743:5) at RecordType.writeUser [as _write] (eval at RecordType._createWriter (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:2005:10), :4:6) at RecordType.Type.encode (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:294:8) at Object.magicByte.toMessageBuffer (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/magic-byte.js:29:18) at Ctor.Producer.serialize (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/kafka-producer.js:110:28) at Ctor.Producer._produceWrapper (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/kafka-producer.js:94:23) at kafkaAvro.getProducer.then.producer (/mnt/d/dev/lambda-kafka/src/test2/handler.js:28:30) at tryCatcher (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/util.js:16:23) at Promise._settlePromiseFromHandler (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:512:31) at Promise._settlePromise (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:569:18) at Promise._settlePromise0 (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:614:10) at Promise._settlePromises (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:693:18) at Async._drainQueue (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:133:16) at Async._drainQueues (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:143:10) at Immediate.Async.drainQueues (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:17:14)

Here is the code:

const KafkaAvro = require('kafka-avro');

let kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081',
});

kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');

        kafkaAvro.getProducer({
                'debug': 'all'
            })
            .then(producer => {
                let topicName = 'test.topic';

                let topic = producer.Topic(topicName, {
                    'request.required.acks': 1
                });

                let value = new Buffer('{ "name": "Beethooven"}');
                let key = '"key"';

                let partition = -1;

                try {
                    producer.produce(topic, partition, value, key);
                } catch (e) {
                    console.error('A problem occurred when sending our message');
                    console.error(e);
                }
            })
    });

Thanks

@thanpolas
Copy link
Contributor

I suspect this has to do with a not found topic. Double check that the topic is properly created and populated from the Schema Registry.

Making a note to check for existence of topic at https://github.com/waldophotos/kafka-avro/blob/master/lib/kafka-producer.js#L91

@thanpolas thanpolas changed the title Existing schema registry entry produces Error: Invalid "string": undefined Check topic existence on Producer '_produceWrapper()` Dec 22, 2017
@jobetdelima
Copy link

jobetdelima commented Mar 2, 2018

Tried the above code, just changed the topic name to 'node_avro'. My Kafka set-up allows for auto-topic creation and I could see that the topic indeed got created. A message got pushed to it that looks like below.

However, I didn't see a schema get created on my schema registry. 'Appreciate any lead.

Raw Message:

	{
		"timestamp": -1,
		"partition": 0,
		"key": "\"key\"",
		"offset": 0,
		"topic": "node_avro",
		"value": {
			"type": "Buffer",
			"data": [
				123,
				32,
				34,
				110,
				97,
				109,
				101,
				34,
				58,
				32,
				34,
				66,
				101,
				101,
				116,
				104,
				111,
				111,
				118,
				101,
				110,
				34,
				125
			]
		}
	}
]```

Update: I tried creating the schems for key and value first before running the above script, and now I can reproduce the issue reported in the orig post for this issue.

@jobetdelima
Copy link

Spent some time debugging this. Looks like the "var pos = type.encode(val, buf, 5);" line is the one that's throwing that error. Hoping this would help.

magicByte.toMessageBuffer = function (val, type, schemaId, optLength) {
  var length = optLength || 1024;
  var buf = new Buffer(length);

  buf[0] = MAGIC_BYTE; // Magic byte.
  buf.writeInt32BE(schemaId, 1);

  var pos = type.encode(val, buf, 5);

  if (pos < 0) {
    // The buffer was too short, we need to resize.
    return magicByte.toMessageBuffer(val, type, schemaId, length - pos);
  }
  return buf.slice(0, pos);
};

@thanpolas
Copy link
Contributor

Thank you for the research @jobetdelima, it will take some time for me to have a look at this as I no longer use kafka for my job. If you need this to be fixed fast I'd appreciate a PR

@hackmad
Copy link

hackmad commented Mar 8, 2018

We got it working in this way. Tested with node v8.5.0 and v9.7.1. The key was to make sure schema registry had schemas for the topic keys and values named using the convention <topic>-key and <topic>-value. In this example both are record but the key can also be string.

producer.js:

const KafkaAvro = require('kafka-avro');
const sleep = require('sleep');

// If running on host machine instead of inside docker, make sure to add
// hostnames 'kafka', 'schema-registry', 'zookeeper', etc to 127.0.0.1
const schemaRegistryUrl = 'http://schema-registry:8081';
const broker = 'kafka:9092';
const topicName = 'avrotest';

// Ensure schema is in registry:
/*
Note that the key is a record so the Kafka Rest Proxy can also work (it doesn't support simple string type)

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"record\", \"name\": \"avrotest_key\", \"fields\": [{\"type\": \"string\", \"name\": \"id\"}]}"}' http://schema-registry:8081/subjects/avrotest-key/versions

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"record\", \"name\": \"avrotest_value\", \"fields\": [{\"type\": \"string\", \"name\": \"id\"}]}"}' http://schema-registry:8081/subjects/avrotest-value/versions
*/

let kafkaAvro = new KafkaAvro({
	kafkaBroker: broker, 
	schemaRegistry: schemaRegistryUrl,
	topics: [topicName],
	fetchAllVersions: true
});
kafkaAvro.init()
	.then(() => {
		console.log('Ready to use');
		return kafkaAvro.getProducer({debug: 'all'}); 
	})
	.then(producer => {
		producer.on('producer error', err => {
			console.log(err);
			process.exit(1);
		});
		
		producer.on('disconnected', arg => {
			console.log('producer disconnected. ' + JSON.stringify(arg));
		});
	
		let topic = producer.Topic(topicName, {'request.required.acks': 1});

		while (true) {
			sleep.msleep(100);
			let key = {id: Math.random().toString()};
			let value = {id: Math.random().toString()};
			let partition = -1;

			console.log(key, value);
			producer.produce(topic, partition, value, key);
		}
	}).catch(err => {
		console.error('A problem occurred');
		console.error(err);
		process.exit(1);
	});

consumer.js:

const KafkaAvro = require('kafka-avro');

// If running on host machine instead of inside docker, make sure to add
// hostnames 'kafka', 'schema-registry', 'zookeeper', etc to 127.0.0.1
const schemaRegistryUrl = 'http://schema-registry:8081';
const broker = 'kafka:9092';
const topicName = 'avrotest';

// Ensure schema is in registry. See producer.js.

let kafkaAvro = new KafkaAvro({
	kafkaBroker: broker, 
	schemaRegistry: schemaRegistryUrl,
	topics: [topicName],
	fetchAllVersions: true
});
kafkaAvro.init()
	.then(() => {
		console.log('Ready to use');
		return kafkaAvro.getConsumer({
			'group.id': 'avrotest',
			'socket.keepalive.enable': true,
			'enable.auto.commit': true,
		});
	})
	.then(consumer => {
		let stream = consumer.getReadStream(topicName, {
			waitInterval: 0
		});
		
		stream.on('error', err => {
			console.log('stream error ' + err);
			process.exit(1);
		});
		
		consumer.on('consumer error', err => {
			console.log(err);
			process.exit(1);
		});
		
		stream.on('data', function(data) {
			console.log(data);
		});
	})
	.catch(err => {
		console.error('A problem occurred');
		console.error(err);
		process.exit(1);
	});

package.json:

{
  "name": "node-kafka-avro",
  "version": "1.0.0",
  "description": "",
  "main": "",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "kafka-avro": "^0.8.1",
    "sleep": "^5.1.1"
  }
}

When running node producer.js you should see following output:

...
...
{ id: '0.3689619598109377' } { id: 0.5356955845449605' }
{ id: '0.2447981324867743' } { id: '0.35424180997972443' }
{ id: '0.7983830046483413' } { id: '0.6968566564547141' }
...
...

When running node consumer.js you should see following output:

...
...
{ value: <Buffer 00 00 00 00 02 24 30 2e 35 33 35 36 39 35 35 38 34 35 34 34 39 36 30 35>,
  size: 24,
  key: '[object Object]',
  topic: 'avrotest',
  offset: 2101,
  partition: 0,
  parsed: avrotest_value { id: '0.5356955845449605' },
  schemaId: 2 }
{ value: <Buffer 00 00 00 00 02 26 30 2e 33 35 34 32 34 31 38 30 39 39 37 39 37 32 34 34 33>,
  size: 25,
  key: '[object Object]',
  topic: 'avrotest',
  offset: 2102,
  partition: 0,
  parsed: avrotest_value { id: '0.35424180997972443' },
  schemaId: 2 }
{ value: <Buffer 00 00 00 00 02 24 30 2e 36 39 36 38 35 36 36 35 36 34 35 34 37 31 34 31>,
  size: 24,
  key: '[object Object]',
  topic: 'avrotest',
  offset: 2103,
  partition: 0,
  parsed: avrotest_value { id: '0.6968566564547141' },
  schemaId: 2 }
...
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants