Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] MQTT: do not JSON-encode retained message bodies (restore #4825) #5050

Merged
merged 11 commits into from
Feb 13, 2024
123 changes: 103 additions & 20 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,14 @@ type mqttPublish struct {
// When we submit a PUBREL for delivery, we add a "Nmqtt-PubRel" header that
// contains the PI.
const (
mqttNatsHeader = "Nmqtt-Pub"
mqttNatsPubRelHeader = "Nmqtt-PubRel"
mqttNatsHeaderSubject = "Nmqtt-Subject"
mqttNatsHeaderMapped = "Nmqtt-Mapped"
mqttNatsHeader = "Nmqtt-Pub"
mqttNatsRetainedMessageTopic = "Nmqtt-RTopic"
mqttNatsRetainedMessageOrigin = "Nmqtt-ROrigin"
mqttNatsRetainedMessageFlags = "Nmqtt-RFlags"
mqttNatsRetainedMessageSource = "Nmqtt-RSource"
mqttNatsPubRelHeader = "Nmqtt-PubRel"
mqttNatsHeaderSubject = "Nmqtt-Subject"
mqttNatsHeaderMapped = "Nmqtt-Mapped"
)

type mqttParsedPublishNATSHeader struct {
Expand Down Expand Up @@ -1943,9 +1947,9 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl
// Run from various go routines (JS consumer, etc..).
// No lock held on entry.
func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
rm := &mqttRetainedMsg{}
if err := json.Unmarshal(msg, rm); err != nil {
h, m := c.msgParts(rmsg)
rm, err := mqttDecodeRetainedMessage(h, m)
if err != nil {
return
}
// If lastSeq is 0 (nothing to recover, or done doing it) and this is
Expand All @@ -1960,6 +1964,7 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm, false)

// If we were recovering (lastSeq > 0), then check if we are done.
// TODO <>/<> does this work with ReplayImmediate?
levb marked this conversation as resolved.
Show resolved Hide resolved
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
as.rrmLastSeq = 0
close(as.rrmDoneCh)
Expand Down Expand Up @@ -2772,20 +2777,94 @@ func (as *mqttAccountSessionManager) loadRetainedMessages(subjects map[string]st
w.Warnf("failed to load retained message for subject %q: %v", ss[i], err)
continue
}
var rm mqttRetainedMsg
if err := json.Unmarshal(result.Message.Data, &rm); err != nil {
rm, err := mqttDecodeRetainedMessage(result.Message.Header, result.Message.Data)
if err != nil {
w.Warnf("failed to decode retained message for subject %q: %v", ss[i], err)
continue
}

// Add the loaded retained message to the cache, and to the results map.
key := ss[i][len(mqttRetainedMsgsStreamSubject):]
as.setCachedRetainedMsg(key, &rm, false, false)
rms[key] = &rm
as.setCachedRetainedMsg(key, rm, false, false)
rms[key] = rm
}
return rms
}

// Composes a NATS message for a storeable mqttRetainedMsg.
func mqttEncodeRetainedMessage(rm *mqttRetainedMsg) (natsMsg []byte, headerLen int) {
// No need to encode the subject, we can restore it from topic.
l := len(mqttNatsRetainedMessageTopic) + 1 + len(rm.Topic) + 2 // 1 byte for ':', 2 bytes for CRLF
if rm.Origin != "" {
l += len(mqttNatsRetainedMessageOrigin) + 1 + len(rm.Origin) + 2 // 1 byte for ':', 2 bytes for CRLF
}
if rm.Source != "" {
l += len(mqttNatsRetainedMessageSource) + 1 + len(rm.Source) + 2 // 1 byte for ':', 2 bytes for CRLF
}
l = len(mqttNatsRetainedMessageFlags) + 1 + 2 + 2 // 1 byte for ':', 2 bytes for the flags, 2 bytes for CRLF
l += 2 // 2 bytes for the extra CRLF after the header
l += len(rm.Msg)

buf := bytes.NewBuffer(make([]byte, l))

buf.WriteString(hdrLine)

buf.WriteString(mqttNatsRetainedMessageTopic)
buf.WriteByte(':')
buf.WriteString(rm.Topic)
buf.WriteString(_CRLF_)

buf.WriteString(mqttNatsRetainedMessageFlags)
buf.WriteByte(':')
buf.WriteString(strconv.FormatUint(uint64(rm.Flags), 16))
buf.WriteString(_CRLF_)

if rm.Origin != "" {
levb marked this conversation as resolved.
Show resolved Hide resolved
buf.WriteString(mqttNatsRetainedMessageOrigin)
buf.WriteByte(':')
buf.WriteString(rm.Origin)
buf.WriteString(_CRLF_)
}
if rm.Source != "" {
levb marked this conversation as resolved.
Show resolved Hide resolved
buf.WriteString(mqttNatsRetainedMessageSource)
buf.WriteByte(':')
buf.WriteString(rm.Source)
buf.WriteString(_CRLF_)
}

// End of header, finalize
buf.WriteString(_CRLF_)
headerLen = buf.Len()
buf.Write(rm.Msg)
return buf.Bytes(), headerLen
}

func mqttDecodeRetainedMessage(h, m []byte) (*mqttRetainedMsg, error) {
fHeader := getHeader(mqttNatsRetainedMessageFlags, h)
if len(fHeader) > 0 {
flags, err := strconv.ParseUint(string(fHeader), 16, 8)
if err != nil {
return nil, fmt.Errorf("invalid retained message flags: %v", err)
}
topic := getHeader(mqttNatsRetainedMessageTopic, h)
subj, _ := mqttToNATSSubjectConversion(topic, false)
return &mqttRetainedMsg{
Flags: byte(flags),
Subject: string(subj),
Topic: string(topic),
Origin: string(getHeader(mqttNatsRetainedMessageOrigin, h)),
Source: string(getHeader(mqttNatsRetainedMessageSource, h)),
Msg: m,
}, nil
} else {
var rm mqttRetainedMsg
if err := json.Unmarshal(m, &rm); err != nil {
return nil, err
}
return &rm, nil
}
}

// Creates the session stream (limit msgs of 1) for this client ID if it does
// not already exist. If it exists, recover the single record to rebuild the
// state of the session. If there is a session record but this session is not
Expand Down Expand Up @@ -2932,7 +3011,9 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
return err
}

// Unmarshal the message so that we can obtain the subject name.
// Unmarshal the message so that we can obtain the subject name. Do not
// use mqttDecodeRetainedMessage() here because these messages are from
// older versions, and contain the full JSON encoding in payload.
var rmsg mqttRetainedMsg
if err = json.Unmarshal(smsg.Data, &rmsg); err == nil {
// Store the message again, this time with the new per-key subject.
Expand Down Expand Up @@ -4174,25 +4255,27 @@ func (c *client) mqttHandlePubRetain() {
}
} else {
// Spec [MQTT-3.3.1-5]. Store the retained message with its QoS.
//
// When coming from a publish protocol, `pp` is referencing a stack
// variable that itself possibly references the read buffer.
rm := &mqttRetainedMsg{
Origin: asm.jsa.id,
Subject: key,
Topic: string(pp.topic),
Msg: pp.msg,
Msg: pp.msg, // will copy these bytes later as we process rm.
Flags: pp.flags,
Source: c.opts.Username,
}
rmBytes, _ := json.Marshal(rm)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, -1, rmBytes)
rmBytes, hdr := mqttEncodeRetainedMessage(rm) // will copy the payload bytes
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, hdr, rmBytes)
if err == nil {
// Update the new sequence
// Update the new sequence.
rf := &mqttRetainedMsgRef{
sseq: smr.Sequence,
}
// Add/update the map
asm.handleRetainedMsg(key, rf, rm, true) // will copy the payload bytes if needs to update rmsCache
// Add/update the map. `true` to copy the payload bytes if needs to
// update rmsCache.
asm.handleRetainedMsg(key, rf, rm, true)
} else {
c.mu.Lock()
acc := c.acc
Expand Down Expand Up @@ -4269,8 +4352,8 @@ func (s *Server) mqttCheckPubRetainedPerms() {
if err != nil || jsm == nil {
continue
}
var rm mqttRetainedMsg
if err := json.Unmarshal(jsm.Data, &rm); err != nil {
rm, err := mqttDecodeRetainedMessage(jsm.Header, jsm.Data)
if err != nil {
continue
}
if rm.Source == _EMPTY_ {
Expand Down
78 changes: 78 additions & 0 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7448,6 +7448,84 @@ func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) {
testMQTTExpectNothing(t, r)
}

// Test for auto-cleanup of consumers.
func TestMQTTDecodeRetainedMessage(t *testing.T) {
tdir := t.TempDir()
tmpl := `
listen: 127.0.0.1:-1
server_name: mqtt
jetstream {
store_dir = %q
}

mqtt {
listen: 127.0.0.1:-1
consumer_inactive_threshold: %q
}

# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, "0.2s")))
s, o := RunServerWithConfig(conf)

levb marked this conversation as resolved.
Show resolved Hide resolved
// Connect and publish a retained message, this will be in the "newer" form,
// with the metadata in the header.
mc, r := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
levb marked this conversation as resolved.
Show resolved Hide resolved
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc, r, 0, false, true, "foo/1", 0, []byte("msg1"))
mc.Close()

// Store a "legacy", JSON-encoded payload directly into the stream.
nc, js := jsClientConnect(t, s)
defer nc.Close()

rm := mqttRetainedMsg{
Origin: "test",
Subject: "foo.2",
Topic: "foo/2",
Flags: mqttPubFlagRetain,
Msg: []byte("msg2"),
}
jsonData, _ := json.Marshal(rm)
_, err := js.PublishMsg(&nats.Msg{
Subject: mqttRetainedMsgsStreamSubject + rm.Subject,
Data: jsonData,
})
if err != nil {
t.Fatalf("Error publishing retained message to JS directly: %v", err)
}

// Restart the server to see that it picks up both retained messages on restart.
s.Shutdown()
s = RunServer(o)

levb marked this conversation as resolved.
Show resolved Hide resolved
// Connect again, subscribe, and check that we get both messages.
mc, r = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
levb marked this conversation as resolved.
Show resolved Hide resolved
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/+", qos: 0}}, []byte{0})
testMQTTCheckPubMsg(t, mc, r, "foo/1", mqttPubFlagRetain, []byte("msg1"))
testMQTTCheckPubMsg(t, mc, r, "foo/2", mqttPubFlagRetain, []byte("msg2"))
testMQTTExpectNothing(t, r)
mc.Close()

// Clear both retained messages.
mc, r = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
levb marked this conversation as resolved.
Show resolved Hide resolved
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc, r, 0, false, true, "foo/1", 0, []byte{})
testMQTTPublish(t, mc, r, 0, false, true, "foo/2", 0, []byte{})
mc.Close()

// Connect again, subscribe, and check that we get nothing.
mc, r = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
levb marked this conversation as resolved.
Show resolved Hide resolved
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/+", qos: 0}}, []byte{0})
testMQTTExpectNothing(t, r)
mc.Close()

testMQTTShutdownServer(s)
levb marked this conversation as resolved.
Show resolved Hide resolved
}

//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
Expand Down