Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

goroutine leak when calling rocketmq.NewPushConsumer frequently #1147

Open
hoverseu opened this issue Jun 12, 2024 · 1 comment
Open

goroutine leak when calling rocketmq.NewPushConsumer frequently #1147

hoverseu opened this issue Jun 12, 2024 · 1 comment

Comments

@hoverseu
Copy link

hoverseu commented Jun 12, 2024

BUG REPORT

Once, we had a group that was not created, so our app code calling rocketmq.NewPushConsumer returned err. Since the retry mechanism, the method rocketmq.NewPushConsumer was called frequently, then we found that the memory kept growing.

I speculated that there was groutine leak. I use runtime.NumGoroutine() to print the number of groutines, which confirms this.

By investigating the code and adding debug logs, we found that after calling pushConsumer.Shutdown(), one goroutine in pushConsumer.Start and some statistics-related goroutines could not exit in time and were blocked in sleep. The following are some of the blocking points.

push_consumer.go

  1. first goroutine sleep pc.option.ConsumeTimeout, Usually pc.option.ConsumeTimeout will be a longer time.
func (pc *pushConsumer) Start() error {
...
                // first goroutine
		go primitive.WithRecover(func() {
			if pc.consumeOrderly {
				return
			}
                         // 
			time.Sleep(pc.option.ConsumeTimeout)
			pc.cleanExpiredMsg()

			ticker := time.NewTicker(pc.option.ConsumeTimeout)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					pc.cleanExpiredMsg()
				case <-pc.done:
					rlog.Info("push consumer close cleanExpiredMsg listener.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		})
...
}

consumer/statistics.go

  1. The first goroutine will block for 1 minute before returning. Even if sis.closed is closed.
  2. The send goroutine will block for 1 hour before returning. Even if sis.closed is closed.
  3. The third goroutine will block for 1 day before returning. Even if sis.closed is closed.
func (sis *statsItemSet) init() {
    ...
    // first goroutine 
     go primitive.WithRecover(func() {
		time.Sleep(nextMinutesTime().Sub(time.Now()))
		ticker := time.NewTicker(time.Minute)
		defer ticker.Stop()
		for {
			select {
			case <-sis.closed:
				return
			case <-ticker.C:
				sis.printAtMinutes()
			}
		}
	})

     // second goroutine 
	go primitive.WithRecover(func() {
		time.Sleep(nextHourTime().Sub(time.Now()))
		ticker := time.NewTicker(time.Hour)
		defer ticker.Stop()
		for {
			select {
			case <-sis.closed:
				return
			case <-ticker.C:
				sis.printAtHour()
			}
		}
	})

      // third goroutine
	go primitive.WithRecover(func() {
		time.Sleep(nextMonthTime().Sub(time.Now()))
		ticker := time.NewTicker(24 * time.Hour)
		defer ticker.Stop()
		for {
			select {
			case <-sis.closed:
				return
			case <-ticker.C:
				sis.printAtDay()
			}
		}
	})
}

My advice:

time.NewTicker will not be executed immediately, but after the Ticker cycle, so I think the sleep code of these routines is unnecessary.

@absolute8511
Copy link
Contributor

absolute8511 commented Jun 24, 2024

actually, the nextMonthTime() should be nextDayTime(). However, I think the sleep is not necessary at all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants