Skip to content
This repository has been archived by the owner on Jan 24, 2020. It is now read-only.

changed my name from stern0 to kaskerd and upgraded grpc dependency #7

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

* Go [![GoDoc](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/sandglass/sandglass-client/go/sg)

* Node.js (Thanks to [@stern0](https://github.com/stern0))
* Node.js (Thanks to [@kaskerd](https://github.com/kaskerd))
15 changes: 7 additions & 8 deletions node/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const sgproto = require('@sandglass/grpc')
const grpc = require('grpc')

const Consumer = require('./consumer')
const { internal } = require('./util')

/**
* @typedef {Object} Client
Expand All @@ -18,7 +17,7 @@ module.exports = class Client {
* @param {String} address
*/
constructor(address) {
internal(this).client = new sgproto.BrokerService(`:${address}`, grpc.credentials.createInsecure())
this.client = new sgproto.BrokerService(`:${address}`, grpc.credentials.createInsecure())
}

/**
Expand All @@ -30,7 +29,7 @@ module.exports = class Client {
async createTopic(params) {
return new Promise((resolve, reject) => {

internal(this).client.CreateTopic(params, (err, resp) => {
this.client.CreateTopic(params, (err, resp) => {
if (err) return reject(err)
return resolve(resp)
})
Expand All @@ -45,7 +44,7 @@ module.exports = class Client {
async listPartitions(topic) {
return new Promise((resolve, reject) => {

internal(this).client.GetTopic({ name: topic }, (err, resp) => {
this.client.GetTopic({ name: topic }, (err, resp) => {
if (err) return reject(err)
return resolve(resp)
})
Expand All @@ -62,7 +61,7 @@ module.exports = class Client {
async produceMessage(topic, partition, msg) {
return new Promise((resolve, reject) => {

internal(this).client.Produce({
this.client.Produce({
topic: topic,
partition: partition,
messages: msg,
Expand All @@ -87,7 +86,7 @@ module.exports = class Client {
meta.add('topic', topic)
meta.add('partition', partition)

internal(this).client.ProduceMessagesStream(meta, (err, resp) => {
this.client.ProduceMessagesStream(meta, (err, resp) => {
if (err) return reject(err)
return resolve(resp)
})
Expand All @@ -103,14 +102,14 @@ module.exports = class Client {
* @param {String} name
*/
async newConsumer(topic, partition, group, name) {
return new Consumer(internal(this).client, topic, partition, group, name)
return new Consumer(this.client, topic, partition, group, name)
}

/**
* Close client
*/
async close() {
return grpc.closeClient(this)
await grpc.closeClient(this)
}

}
54 changes: 1 addition & 53 deletions node/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = class Consumer {
/**
* Sandglass consumer
*
* @param {Object} client
* @param {Object} client sandglass Client instance
* @param {String} topic
* @param {String} partition
* @param {String} group
Expand Down Expand Up @@ -90,56 +90,4 @@ module.exports = class Consumer {
})
}

/**
*
* @param {Array} offsets
*/
async acknowledgeMessages(offsets) {

if (typeof offsets === 'undefined') throw new Error(`offsets must be defined`)
if (offsets.length === 0) throw new Error(`offsets should not be emty`)
if (Array.isArray(offsets) === false) throw new Error(`offsets must be an array`)

return new Promise((resolve, reject) => {

internal(this).client.AcknowledgeMessages({
topic: internal(this).topic,
partition: internal(this).partition,
consumerGroupName: internal(this).group,
consumerName: internal(this).name,
offset: offsets,
},
(err, resp) => {
if (err) return reject(err)
return resolve(resp)
})
})
}

/**
*
* @param {Object} msg
*/
async commit(msg) {

if (typeof msg.offset === 'undefined') throw new Error(`offset must be defined`)
if (msg.offset.length === 0) throw new Error(`offset should not be emty`)
if (Array.isArray(msg.offset) === false) throw new Error(`offset must be an array`)

return new Promise((resolve, reject) => {

internal(this).client.Commit({
topic: internal(this).topic,
partition: internal(this).partition,
consumerGroupName: internal(this).group,
consumerName: internal(this).name,
offset: msg.offset,
},
(err, resp) => {
if (err) return reject(err)
return resolve(resp)
})
})
}

}
15 changes: 3 additions & 12 deletions node/example/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,9 @@ execSync(`sandctl produce futura '{"dest" : "[email protected]"}' -n 10`)
async function consume(topic) {

try {

const partitions = await client.listPartitions(topic)

const consumer = await client.newConsumer(
topic,
partitions[Object.keys(partitions)[0]],
'group2',
'consumer2'
)

const partition = partitions.partitions[0]
const consumer = await client.newConsumer(topic, partition, 'group1', 'consumer1')
const stream = await consumer.consume()

return stream
Expand All @@ -53,6 +46,4 @@ consume(topic)
})
.catch(err => console.log(`there was an error ${err}`))

// const start = process.hrtime()

// const end = process.hrtime(start)
client.close()
14 changes: 14 additions & 0 deletions node/example/produce.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const Client = require('../client')

const client = new Client('7170')

const topic = 'futura'
const msg = Buffer.from('Hello, Sandglass!', 'hex')

client.produceMessage(topic, '', { value: msg })
.then(res => {
let offsets = res.offsets
let value = offsets[0].toString('hex')
console.log(value)
})
.catch(err => console.log(err))
Loading