Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some properties can be multiple #60

Merged
merged 2 commits into from
Jul 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ All properties are mandatory.
userProperties: {
'test': 'test'
},
subscriptionIdentifier: 120,
subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions
contentType: 'test'
}
}
Expand Down
11 changes: 10 additions & 1 deletion parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,16 @@ Parser.prototype._parseProperties = function () {
}
continue
}
result[name] = this._parseByType(constants.propertiesTypes[name])
if (result[name]) {
if (Array.isArray(result[name])) {
result[name].push(this._parseByType(constants.propertiesTypes[name]))
} else {
result[name] = [result[name]]
result[name].push(this._parseByType(constants.propertiesTypes[name]))
}
} else {
result[name] = this._parseByType(constants.propertiesTypes[name])
}
}
return result
}
Expand Down
39 changes: 39 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,45 @@ testParseGenerate('publish MQTT5 properties', {
116, 101, 115, 116 // Payload (test)
]), { protocolVersion: 5 })

testParseGenerate('publish MQTT5 with multiple same properties', {
cmd: 'publish',
retain: true,
qos: 2,
dup: true,
length: 62,
topic: 'test',
payload: new Buffer('test'),
messageId: 10,
properties: {
payloadFormatIndicator: true,
messageExpiryInterval: 4321,
topicAlias: 100,
responseTopic: 'topic',
correlationData: Buffer.from([1, 2, 3, 4]),
userProperties: {
'test': 'test'
},
subscriptionIdentifier: [120, 121],
contentType: 'test'
}
}, Buffer.from([
61, 62, // Header
0, 4, // Topic length
116, 101, 115, 116, // Topic (test)
0, 10, // Message ID
49, // properties length
1, 1, // payloadFormatIndicator
2, 0, 0, 16, 225, // message expiry interval
35, 0, 100, // topicAlias
8, 0, 5, 116, 111, 112, 105, 99, // response topic
9, 0, 4, 1, 2, 3, 4, // correlationData
38, 0, 4, 116, 101, 115, 116, 0, 4, 116, 101, 115, 116, // userProperties
11, 120, // subscriptionIdentifier
11, 121, // subscriptionIdentifier
3, 0, 4, 116, 101, 115, 116, // content type
116, 101, 115, 116 // Payload (test)
]), { protocolVersion: 5 })

;(function () {
var buffer = new Buffer(2048)
testParseGenerate('2KB publish packet', {
Expand Down
133 changes: 75 additions & 58 deletions writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,8 @@ function getProperties (stream, properties) {
}
}
var propertiesLength = 0
function getLengthProperty (name) {
function getLengthProperty (name, value) {
var type = protocol.propertiesTypes[name]
var value = properties[name]
var length = 0
switch (type) {
case 'byte': {
Expand Down Expand Up @@ -948,7 +947,15 @@ function getProperties (stream, properties) {
}
if (properties) {
for (var propName in properties) {
var propLength = getLengthProperty(propName)
var propLength = 0
var propValue = properties[propName]
if (Array.isArray(propValue)) {
for (var valueIndex = 0; valueIndex < propValue.length; valueIndex++) {
propLength += getLengthProperty(propName, propValue[valueIndex])
}
} else {
propLength = getLengthProperty(propName, propValue)
}
if (!propLength) return false
propertiesLength += propLength
}
Expand Down Expand Up @@ -982,68 +989,78 @@ function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {
return propertiesData
}

function writeProperty (stream, propName, value) {
var type = protocol.propertiesTypes[propName]
switch (type) {
case 'byte': {
stream.write(Buffer.from([protocol.properties[propName]]))
stream.write(Buffer.from([+value]))
break
}
case 'int8': {
stream.write(Buffer.from([protocol.properties[propName]]))
stream.write(Buffer.from([value]))
break
}
case 'binary': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeStringOrBuffer(stream, value)
break
}
case 'int16': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeNumber(stream, value)
break
}
case 'int32': {
stream.write(Buffer.from([protocol.properties[propName]]))
write4ByteNumber(stream, value)
break
}
case 'var': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeVarByteInt(stream, value)
break
}
case 'string': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeString(stream, value)
break
}
case 'pair': {
Object.getOwnPropertyNames(value).forEach(function (name) {
var currentValue = value[name]
if (Array.isArray(currentValue)) {
currentValue.forEach(function (value) {
stream.write(Buffer.from([protocol.properties[propName]]))
writeStringPair(stream, name.toString(), value.toString())
})
} else {
stream.write(Buffer.from([protocol.properties[propName]]))
writeStringPair(stream, name.toString(), currentValue.toString())
}
})
break
}
default: {
stream.emit('error', new Error('Invalid property ' + propName + ' value: ' + value))
return false
}
}
}

function writeProperties (stream, properties, propertiesLength) {
/* write properties to stream */
writeVarByteInt(stream, propertiesLength)
for (var propName in properties) {
if (properties.hasOwnProperty(propName) && properties[propName] !== null) {
var value = properties[propName]
var type = protocol.propertiesTypes[propName]
switch (type) {
case 'byte': {
stream.write(Buffer.from([protocol.properties[propName]]))
stream.write(Buffer.from([+value]))
break
}
case 'int8': {
stream.write(Buffer.from([protocol.properties[propName]]))
stream.write(Buffer.from([value]))
break
}
case 'binary': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeStringOrBuffer(stream, value)
break
}
case 'int16': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeNumber(stream, value)
break
}
case 'int32': {
stream.write(Buffer.from([protocol.properties[propName]]))
write4ByteNumber(stream, value)
break
}
case 'var': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeVarByteInt(stream, value)
break
}
case 'string': {
stream.write(Buffer.from([protocol.properties[propName]]))
writeString(stream, value)
break
}
case 'pair': {
Object.getOwnPropertyNames(value).forEach(function (name) {
var currentValue = value[name]
if (Array.isArray(currentValue)) {
currentValue.forEach(function (value) {
stream.write(Buffer.from([protocol.properties[propName]]))
writeStringPair(stream, name.toString(), value.toString())
})
} else {
stream.write(Buffer.from([protocol.properties[propName]]))
writeStringPair(stream, name.toString(), currentValue.toString())
}
})
break
}
default: {
stream.emit('error', new Error('Invalid property ' + propName))
return false
if (Array.isArray(value)) {
for (var valueIndex = 0; valueIndex < value.length; valueIndex++) {
writeProperty(stream, propName, value[valueIndex])
}
} else {
writeProperty(stream, propName, value)
}
}
}
Expand Down