Skip to content

Commit

Permalink
[IMPROVED] MQTT: do not JSON-encode retained message bodies (restore #…
Browse files Browse the repository at this point in the history
…4825) (#5050)

This change is for 2.11 only. As noted in the original PR, downgrading
to 2.10 and below would result in data loss for MQTT retained messages
(and potential error handling issues).

#4825 was reverted from main,
adding it back now, with some changes. Specifically, no longer use JSON
for the header value; instead use 4 separate headers.

The revert was messy, not sure I can reproduce the issue seen in the
original PR.
  • Loading branch information
derekcollison authored Feb 13, 2024
2 parents 5cf4ae5 + 45c8f82 commit 19cba6f
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 21 deletions.
124 changes: 103 additions & 21 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,14 @@ type mqttPublish struct {
// When we submit a PUBREL for delivery, we add a "Nmqtt-PubRel" header that
// contains the PI.
const (
mqttNatsHeader = "Nmqtt-Pub"
mqttNatsPubRelHeader = "Nmqtt-PubRel"
mqttNatsHeaderSubject = "Nmqtt-Subject"
mqttNatsHeaderMapped = "Nmqtt-Mapped"
mqttNatsHeader = "Nmqtt-Pub"
mqttNatsRetainedMessageTopic = "Nmqtt-RTopic"
mqttNatsRetainedMessageOrigin = "Nmqtt-ROrigin"
mqttNatsRetainedMessageFlags = "Nmqtt-RFlags"
mqttNatsRetainedMessageSource = "Nmqtt-RSource"
mqttNatsPubRelHeader = "Nmqtt-PubRel"
mqttNatsHeaderSubject = "Nmqtt-Subject"
mqttNatsHeaderMapped = "Nmqtt-Mapped"
)

type mqttParsedPublishNATSHeader struct {
Expand Down Expand Up @@ -1943,9 +1947,9 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl
// Run from various go routines (JS consumer, etc..).
// No lock held on entry.
func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
rm := &mqttRetainedMsg{}
if err := json.Unmarshal(msg, rm); err != nil {
h, m := c.msgParts(rmsg)
rm, err := mqttDecodeRetainedMessage(h, m)
if err != nil {
return
}
// If lastSeq is 0 (nothing to recover, or done doing it) and this is
Expand Down Expand Up @@ -2772,20 +2776,94 @@ func (as *mqttAccountSessionManager) loadRetainedMessages(subjects map[string]st
w.Warnf("failed to load retained message for subject %q: %v", ss[i], err)
continue
}
var rm mqttRetainedMsg
if err := json.Unmarshal(result.Message.Data, &rm); err != nil {
rm, err := mqttDecodeRetainedMessage(result.Message.Header, result.Message.Data)
if err != nil {
w.Warnf("failed to decode retained message for subject %q: %v", ss[i], err)
continue
}

// Add the loaded retained message to the cache, and to the results map.
key := ss[i][len(mqttRetainedMsgsStreamSubject):]
as.setCachedRetainedMsg(key, &rm, false, false)
rms[key] = &rm
as.setCachedRetainedMsg(key, rm, false, false)
rms[key] = rm
}
return rms
}

// Composes a NATS message for a storeable mqttRetainedMsg.
func mqttEncodeRetainedMessage(rm *mqttRetainedMsg) (natsMsg []byte, headerLen int) {
// No need to encode the subject, we can restore it from topic.
l := len(mqttNatsRetainedMessageTopic) + 1 + len(rm.Topic) + 2 // 1 byte for ':', 2 bytes for CRLF
if rm.Origin != _EMPTY_ {
l += len(mqttNatsRetainedMessageOrigin) + 1 + len(rm.Origin) + 2 // 1 byte for ':', 2 bytes for CRLF
}
if rm.Source != _EMPTY_ {
l += len(mqttNatsRetainedMessageSource) + 1 + len(rm.Source) + 2 // 1 byte for ':', 2 bytes for CRLF
}
l += len(mqttNatsRetainedMessageFlags) + 1 + 2 + 2 // 1 byte for ':', 2 bytes for the flags, 2 bytes for CRLF
l += 2 // 2 bytes for the extra CRLF after the header
l += len(rm.Msg)

buf := bytes.NewBuffer(make([]byte, l))

buf.WriteString(hdrLine)

buf.WriteString(mqttNatsRetainedMessageTopic)
buf.WriteByte(':')
buf.WriteString(rm.Topic)
buf.WriteString(_CRLF_)

buf.WriteString(mqttNatsRetainedMessageFlags)
buf.WriteByte(':')
buf.WriteString(strconv.FormatUint(uint64(rm.Flags), 16))
buf.WriteString(_CRLF_)

if rm.Origin != _EMPTY_ {
buf.WriteString(mqttNatsRetainedMessageOrigin)
buf.WriteByte(':')
buf.WriteString(rm.Origin)
buf.WriteString(_CRLF_)
}
if rm.Source != _EMPTY_ {
buf.WriteString(mqttNatsRetainedMessageSource)
buf.WriteByte(':')
buf.WriteString(rm.Source)
buf.WriteString(_CRLF_)
}

// End of header, finalize
buf.WriteString(_CRLF_)
headerLen = buf.Len()
buf.Write(rm.Msg)
return buf.Bytes(), headerLen
}

func mqttDecodeRetainedMessage(h, m []byte) (*mqttRetainedMsg, error) {
fHeader := getHeader(mqttNatsRetainedMessageFlags, h)
if len(fHeader) > 0 {
flags, err := strconv.ParseUint(string(fHeader), 16, 8)
if err != nil {
return nil, fmt.Errorf("invalid retained message flags: %v", err)
}
topic := getHeader(mqttNatsRetainedMessageTopic, h)
subj, _ := mqttToNATSSubjectConversion(topic, false)
return &mqttRetainedMsg{
Flags: byte(flags),
Subject: string(subj),
Topic: string(topic),
Origin: string(getHeader(mqttNatsRetainedMessageOrigin, h)),
Source: string(getHeader(mqttNatsRetainedMessageSource, h)),
Msg: m,
}, nil
} else {
var rm mqttRetainedMsg
if err := json.Unmarshal(m, &rm); err != nil {
return nil, err
}
return &rm, nil
}
}

// Creates the session stream (limit msgs of 1) for this client ID if it does
// not already exist. If it exists, recover the single record to rebuild the
// state of the session. If there is a session record but this session is not
Expand Down Expand Up @@ -2932,7 +3010,9 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
return err
}

// Unmarshal the message so that we can obtain the subject name.
// Unmarshal the message so that we can obtain the subject name. Do not
// use mqttDecodeRetainedMessage() here because these messages are from
// older versions, and contain the full JSON encoding in payload.
var rmsg mqttRetainedMsg
if err = json.Unmarshal(smsg.Data, &rmsg); err == nil {
// Store the message again, this time with the new per-key subject.
Expand Down Expand Up @@ -3105,7 +3185,7 @@ func (sess *mqttSession) clear() error {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
if pubRelDur != "" {
if pubRelDur != _EMPTY_ {
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
Expand Down Expand Up @@ -4174,25 +4254,27 @@ func (c *client) mqttHandlePubRetain() {
}
} else {
// Spec [MQTT-3.3.1-5]. Store the retained message with its QoS.
//
// When coming from a publish protocol, `pp` is referencing a stack
// variable that itself possibly references the read buffer.
rm := &mqttRetainedMsg{
Origin: asm.jsa.id,
Subject: key,
Topic: string(pp.topic),
Msg: pp.msg,
Msg: pp.msg, // will copy these bytes later as we process rm.
Flags: pp.flags,
Source: c.opts.Username,
}
rmBytes, _ := json.Marshal(rm)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, -1, rmBytes)
rmBytes, hdr := mqttEncodeRetainedMessage(rm) // will copy the payload bytes
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, hdr, rmBytes)
if err == nil {
// Update the new sequence
// Update the new sequence.
rf := &mqttRetainedMsgRef{
sseq: smr.Sequence,
}
// Add/update the map
asm.handleRetainedMsg(key, rf, rm, true) // will copy the payload bytes if needs to update rmsCache
// Add/update the map. `true` to copy the payload bytes if needs to
// update rmsCache.
asm.handleRetainedMsg(key, rf, rm, true)
} else {
c.mu.Lock()
acc := c.acc
Expand Down Expand Up @@ -4269,8 +4351,8 @@ func (s *Server) mqttCheckPubRetainedPerms() {
if err != nil || jsm == nil {
continue
}
var rm mqttRetainedMsg
if err := json.Unmarshal(jsm.Data, &rm); err != nil {
rm, err := mqttDecodeRetainedMessage(jsm.Header, jsm.Data)
if err != nil {
continue
}
if rm.Source == _EMPTY_ {
Expand Down
81 changes: 81 additions & 0 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7448,6 +7448,87 @@ func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) {
testMQTTExpectNothing(t, r)
}

// Test for auto-cleanup of consumers.
func TestMQTTDecodeRetainedMessage(t *testing.T) {
tdir := t.TempDir()
tmpl := `
listen: 127.0.0.1:-1
server_name: mqtt
jetstream {
store_dir = %q
}
mqtt {
listen: 127.0.0.1:-1
consumer_inactive_threshold: %q
}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, "0.2s")))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)

// Connect and publish a retained message, this will be in the "newer" form,
// with the metadata in the header.
mc, r := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc, r, 0, false, true, "foo/1", 0, []byte("msg1"))
mc.Close()

// Store a "legacy", JSON-encoded payload directly into the stream.
nc, js := jsClientConnect(t, s)
defer nc.Close()

rm := mqttRetainedMsg{
Origin: "test",
Subject: "foo.2",
Topic: "foo/2",
Flags: mqttPubFlagRetain,
Msg: []byte("msg2"),
}
jsonData, _ := json.Marshal(rm)
_, err := js.PublishMsg(&nats.Msg{
Subject: mqttRetainedMsgsStreamSubject + rm.Subject,
Data: jsonData,
})
if err != nil {
t.Fatalf("Error publishing retained message to JS directly: %v", err)
}

// Restart the server to see that it picks up both retained messages on restart.
s.Shutdown()
s = RunServer(o)
defer testMQTTShutdownServer(s)

// Connect again, subscribe, and check that we get both messages.
mc, r = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/+", qos: 0}}, []byte{0})
testMQTTCheckPubMsg(t, mc, r, "foo/1", mqttPubFlagRetain, []byte("msg1"))
testMQTTCheckPubMsg(t, mc, r, "foo/2", mqttPubFlagRetain, []byte("msg2"))
testMQTTExpectNothing(t, r)
mc.Close()

// Clear both retained messages.
mc, r = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc, r, 0, false, true, "foo/1", 0, []byte{})
testMQTTPublish(t, mc, r, 0, false, true, "foo/2", 0, []byte{})
mc.Close()

// Connect again, subscribe, and check that we get nothing.
mc, r = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/+", qos: 0}}, []byte{0})
testMQTTExpectNothing(t, r)
}

//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
Expand Down

0 comments on commit 19cba6f

Please sign in to comment.