Skip to content

Commit

Permalink
Merge pull request #63 from matrober-uk/memory-leaks
Browse files Browse the repository at this point in the history
Fix memory leaks due to MessageHandles
  • Loading branch information
matrober-uk authored Jan 22, 2023
2 parents af0d578 + bd39c38 commit b93d8e8
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 5 deletions.
167 changes: 167 additions & 0 deletions memoryleaks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) IBM Corporation 2023
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*/
package main

import (
"fmt"
"runtime"
"testing"
"time"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
"github.com/stretchr/testify/assert"
)

/*
* Test for memory leak when there is no message to be received.
*
* This test is not included in the normal bucket as it sends an enormous number of
* messages, and requires human observation of the total process size to establish whether
* it passes or not, so can only be run under human supervision
*/
func DONT_RUNTestLeakOnEmptyGet(t *testing.T) {

// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
//assert.Nil(t, cfErr)

// Initialise the attributes of the CF in whatever way you like
cf := mqjms.ConnectionFactoryImpl{
QMName: "QM1",
Hostname: "localhost",
PortNumber: 1414,
ChannelName: "DEV.APP.SVRCONN",
UserName: "app",
Password: "passw0rd",
}

// Creates a connection to the queue manager, using defer to close it automatically
// at the end of the function (if it was created successfully)
context, ctxErr := cf.CreateContext()
assert.Nil(t, ctxErr)
if context != nil {
defer context.Close()
}

// Now send the message and get it back again, to check that it roundtripped.
queue := context.CreateQueue("DEV.QUEUE.1")

consumer, errCons := context.CreateConsumer(queue)
if consumer != nil {
defer consumer.Close()
}
assert.Nil(t, errCons)

for i := 1; i < 35000; i++ {

rcvMsg, errRvc := consumer.ReceiveNoWait()
assert.Nil(t, errRvc)
assert.Nil(t, rcvMsg)

if i%1000 == 0 {
fmt.Println("Messages:", i)
}

}

fmt.Println("Finished receive calls - waiting for cooldown.")
runtime.GC()

time.Sleep(30 * time.Second)

}

/*
* Test for memory leak when sending and receiving messages
*
* This test is not included in the normal bucket as it sends an enormous number of
* messages, and requires human observation of the total process size to establish whether
* it passes or not, so can only be run under human supervision
*/
func DONTRUN_TestLeakOnPutGet(t *testing.T) {

// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
//assert.Nil(t, cfErr)

// Initialise the attributes of the CF in whatever way you like
cf := mqjms.ConnectionFactoryImpl{
QMName: "QM1",
Hostname: "localhost",
PortNumber: 1414,
ChannelName: "DEV.APP.SVRCONN",
UserName: "app",
Password: "passw0rd",
}

// Creates a connection to the queue manager, using defer to close it automatically
// at the end of the function (if it was created successfully)
context, ctxErr := cf.CreateContext()
assert.Nil(t, ctxErr)
if context != nil {
defer context.Close()
}

// Now send the message and get it back again, to check that it roundtripped.
queue := context.CreateQueue("DEV.QUEUE.1")

consumer, errCons := context.CreateConsumer(queue)
if consumer != nil {
defer consumer.Close()
}
assert.Nil(t, errCons)

ttlMillis := 20000
producer := context.CreateProducer().SetTimeToLive(ttlMillis)

for i := 1; i < 25000; i++ {

// Create a TextMessage and check that we can populate it
msgBody := "Message " + fmt.Sprint(i)
txtMsg := context.CreateTextMessage()
txtMsg.SetText(msgBody)
txtMsg.SetIntProperty("MessageNumber", i)

errSend := producer.Send(queue, txtMsg)
assert.Nil(t, errSend)

rcvMsg, errRvc := consumer.ReceiveNoWait()
assert.Nil(t, errRvc)
assert.NotNil(t, rcvMsg)

// Check message body.
switch msg := rcvMsg.(type) {
case jms20subset.TextMessage:
assert.Equal(t, msgBody, *msg.GetText())
default:
assert.Fail(t, "Got something other than a text message")
}

// Check messageID
assert.Equal(t, txtMsg.GetJMSMessageID(), rcvMsg.GetJMSMessageID())

// Check int property
rcvMsgNum, propErr := rcvMsg.GetIntProperty("MessageNumber")
assert.Nil(t, propErr)
assert.Equal(t, i, rcvMsgNum)

if i%1000 == 0 {
fmt.Println("Messages:", i)
}

}

fmt.Println("Finished receive calls - waiting for cooldown.")
runtime.GC()

time.Sleep(30 * time.Second)

}
2 changes: 2 additions & 0 deletions mqjms/ConnectionFactoryImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package mqjms

import (
"strconv"
"sync"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
Expand Down Expand Up @@ -155,6 +156,7 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
// a new ContextImpl and return it to the caller.
ctx = ContextImpl{
qMgr: qMgr,
ctxLock: &sync.Mutex{},
sessionMode: sessionMode,
receiveBufferSize: cf.ReceiveBufferSize,
sendCheckCount: cf.SendCheckCount,
Expand Down
56 changes: 56 additions & 0 deletions mqjms/ConsumerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ package mqjms

import (
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"

"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
Expand Down Expand Up @@ -57,6 +60,11 @@ func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms
// of receive.
func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
consumer.ctx.ctxLock.Lock()
defer consumer.ctx.ctxLock.Unlock()

// Prepare objects to be used in receiving the message.
var msg jms20subset.Message
var jmsErr jms20subset.JMSException
Expand Down Expand Up @@ -99,6 +107,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess

if err == nil {

// Set a finalizer on the message handle to allow it to be deleted
// when it is no longer referenced by an active object, to reduce/prevent
// memory leaks.
setMessageHandlerFinalizer(thisMsgHandle, consumer.ctx.ctxLock)

// Message received successfully (without error).
// Determine on the basis of the format field what sort of message to create.

Expand All @@ -116,6 +129,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
MessageImpl: MessageImpl{
mqmd: getmqmd,
msgHandle: &thisMsgHandle,
ctxLock: consumer.ctx.ctxLock,
},
}

Expand All @@ -133,6 +147,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
MessageImpl: MessageImpl{
mqmd: getmqmd,
msgHandle: &thisMsgHandle,
ctxLock: consumer.ctx.ctxLock,
},
}
}
Expand All @@ -142,6 +157,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
// Error code was returned from MQ call.
mqret := err.(*ibmmq.MQReturn)

// Delete the message handle object in-line here now that it is no longer required,
// to avoid memory leak
dmho := ibmmq.NewMQDMHO()
gmo.MsgHandle.DltMH(dmho)

if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {

// This isn't a real error - it's the way that MQ indicates that there
Expand All @@ -164,6 +184,36 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
return msg, jmsErr
}

/*
* Set a finalizer on the message handle to allow it to be deleted
* when it is no longer referenced by an active object, to reduce/prevent
* memory leaks.
*/
func setMessageHandlerFinalizer(thisMsgHandle ibmmq.MQMessageHandle, ctxLock *sync.Mutex) {

runtime.SetFinalizer(&thisMsgHandle, func(msgHandle *ibmmq.MQMessageHandle) {
ctxLock.Lock()
defer ctxLock.Unlock()

dmho := ibmmq.NewMQDMHO()
err := msgHandle.DltMH(dmho)
if err != nil {

mqret := err.(*ibmmq.MQReturn)

if mqret.MQRC == ibmmq.MQRC_HCONN_ERROR {
// Expected if the connection is closed before the finalizer executes
// (at which point it should get tidied up automatically by the connection)
} else {
fmt.Println("DltMH finalizer", err)
}

}

})

}

// ReceiveStringBodyNoWait implements the IBM MQ logic necessary to receive a
// message from a Destination and return its body as a string.
//
Expand Down Expand Up @@ -356,6 +406,12 @@ func applySelector(selector string, getmqmd *ibmmq.MQMD, gmo *ibmmq.MQGMO) error
func (consumer ConsumerImpl) Close() {

if (ibmmq.MQObject{}) != consumer.qObject {

// Lock the context while we are making calls to the queue manager so that it
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
consumer.ctx.ctxLock.Lock()
defer consumer.ctx.ctxLock.Unlock()

consumer.qObject.Close(0)
}

Expand Down
Loading

0 comments on commit b93d8e8

Please sign in to comment.