Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test: TestSaramaConsumerWrapper_start_Messages #1314

Closed
yurishkuro opened this issue Feb 4, 2019 · 9 comments · Fixed by #2587
Closed

Flaky test: TestSaramaConsumerWrapper_start_Messages #1314

yurishkuro opened this issue Feb 4, 2019 · 9 comments · Fixed by #2587
Assignees
Labels
bug good first issue Good for beginners help wanted Features that maintainers are willing to accept but do not have cycles to implement

Comments

@yurishkuro
Copy link
Member

https://travis-ci.org/jaegertracing/jaeger/jobs/487803410

=== RUN   TestSaramaConsumerWrapper_start_Messages
2019-02-02T09:46:32.154Z	DEBUG	consumer/deadlock_detector.go:151	Starting global deadlock detector
2019-02-02T09:46:32.155Z	INFO	consumer/consumer.go:77	Starting main loop
2019-02-02T09:46:32.155Z	INFO	consumer/consumer.go:162	Starting error handler	{"partition": 316}
2019-02-02T09:46:32.155Z	INFO	consumer/consumer.go:111	Starting message handler	{"partition": 316}
2019-02-02T09:46:32.156Z	DEBUG	consumer/consumer.go:134	Got msg	{"msg": {"Key":null,"Value":null,"Topic":"morekuzambu","Partition":316,"Offset":1,"Timestamp":"0001-01-01T00:00:00Z","BlockTimestamp":"0001-01-01T00:00:00Z","Headers":null}}
2019-02-02T09:46:32.156Z	INFO	consumer/processor_factory.go:62	Creating new processors	{"partition": 316}
2019-02-02T09:46:32.157Z	DEBUG	processor/parallel_processor.go:50	Spawning goroutines to process messages	{"num_routines": 1}
2019-02-02T09:46:32.158Z	INFO	consumer/consumer.go:155	Closing partition consumer	{"partition": 316}
2019-02-02T09:46:32.158Z	INFO	consumer/consumer.go:131	Message channel closed. 	{"partition": 316}
2019-02-02T09:46:32.158Z	DEBUG	processor/parallel_processor.go:75	Initiated shutdown of processor goroutines
2019-02-02T09:46:32.158Z	INFO	processor/parallel_processor.go:78	Completed shutdown of processor goroutines
2019-02-02T09:46:32.158Z	INFO	consumer/consumer.go:173	Finished handling errors	{"partition": 316}
2019-02-02T09:46:32.158Z	INFO	consumer/consumer.go:158	Closed partition consumer	{"partition": 316}
2019-02-02T09:46:32.158Z	DEBUG	consumer/deadlock_detector.go:195	Closing deadlock detector	{"partition": 316}
2019-02-02T09:46:32.158Z	INFO	consumer/consumer.go:155	Closing partition consumer	{"partition": 316}
2019-02-02T09:46:32.158Z	INFO	consumer/deadlock_detector.go:115	Closing ticker routine	{"partition": 316}
2019-02-02T09:46:32.159Z	INFO	consumer/consumer.go:158	Closed partition consumer	{"partition": 316}
2019-02-02T09:46:32.159Z	DEBUG	consumer/deadlock_detector.go:179	Closing all partitions deadlock detector
2019-02-02T09:46:32.159Z	INFO	consumer/consumer.go:106	Closing parent consumer
--- FAIL: TestSaramaConsumerWrapper_start_Messages (0.01s)
    <autogenerated>:1: PASS:	Process(*consumer.saramaMessageWrapper)
	Error Trace:	metricstest.go:49
			metricstest.go:37
			consumer_test.go:167
	Error:      	Not equal: 
	            	expected: int(0)
	            	actual  : int64(1)
	Test:       	TestSaramaConsumerWrapper_start_Messages
	Messages:   	expected metric name: sarama-consumer.partitions-held, tags: map[]
2019-02-02T09:46:32.161Z	DEBUG	consumer/deadlock_detector.go:159	Closing global ticker routine
@yurishkuro
Copy link
Member Author

Getting different stack trace now:

=== RUN   TestSaramaConsumerWrapper_start_Messages
2020-05-11T18:49:25.813Z	DEBUG	consumer/deadlock_detector.go:151	Starting global deadlock detector
2020-05-11T18:49:25.814Z	INFO	consumer/consumer.go:78	Starting main loop
2020-05-11T18:49:25.814Z	INFO	consumer/consumer.go:112	Starting message handler	{"partition": 316}
2020-05-11T18:49:25.815Z	DEBUG	consumer/consumer.go:141	Got msg	{"msg": {"Headers":null,"Timestamp":"0001-01-01T00:00:00Z","BlockTimestamp":"0001-01-01T00:00:00Z","Key":null,"Value":null,"Topic":"morekuzambu","Partition":316,"Offset":1}}
2020-05-11T18:49:25.815Z	INFO	consumer/processor_factory.go:65	Creating new processors	{"partition": 316}
2020-05-11T18:49:25.816Z	DEBUG	processor/parallel_processor.go:50	Spawning goroutines to process messages	{"num_routines": 1}
2020-05-11T18:49:25.817Z	INFO	consumer/consumer.go:169	Starting error handler	{"partition": 316}
    TestSaramaConsumerWrapper_start_Messages: consumer_test.go:161: PASS:	Process(consumer.saramaMessageWrapper)
2020-05-11T18:49:25.818Z	INFO	consumer/consumer.go:162	Closing partition consumer	{"partition": 316}
2020-05-11T18:49:25.818Z	INFO	consumer/consumer.go:138	Message channel closed. 	{"partition": 316}
2020-05-11T18:49:25.818Z	DEBUG	processor/parallel_processor.go:75	Initiated shutdown of processor goroutines
2020-05-11T18:49:25.818Z	INFO	processor/parallel_processor.go:78	Completed shutdown of processor goroutines
2020-05-11T18:49:25.818Z	DEBUG	consumer/deadlock_detector.go:195	Closing deadlock detector	{"partition": 316}
2020-05-11T18:49:25.818Z	INFO	consumer/consumer.go:162	Closing partition consumer	{"partition": 316}
2020-05-11T18:49:25.819Z	INFO	consumer/deadlock_detector.go:115	Closing ticker routine	{"partition": 316}
2020-05-11T18:49:25.819Z	INFO	consumer/consumer.go:165	Closed partition consumer	{"partition": 316}
2020-05-11T18:49:25.819Z	INFO	consumer/consumer.go:165	Closed partition consumer	{"partition": 316}
2020-05-11T18:49:28.815Z	PANIC	consumer/deadlock_detector.go:71	No messages processed in the last check interval	{"partition": -1, "stack": "goroutine 25 [running]:\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.newDeadlockDetector.func1(0xc0ffffffff)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:73 +0x2aa\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*deadlockDetector).start.func1(0xc0000c6180, 0xc0000b0420)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:163 +0x2bf\ncreated by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*deadlockDetector).start\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:152 +0x346\n\ngoroutine 1 [chan receive]:\ntesting.(*T).Run(0xc00012fe60, 0x104f8f5, 0x28, 0x1067820, 0x1)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:1043 +0x699\ntesting.runTests.func1(0xc00012fe60)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:1284 +0xa7\ntesting.tRunner(0xc00012fe60, 0xc00017bce8)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:991 +0x1ec\ntesting.runTests(0xc00000f040, 0x18c4c60, 0x15, 0x15, 0x0)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:1282 +0x528\ntesting.(*M).Run(0xc000168100, 0x0)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:1199 +0x300\nmain.main()\n\t_testmain.go:142 +0x338\n\ngoroutine 24 [semacquire]:\nsync.runtime_Semacquire(0xc0000b0588)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/runtime/sema.go:56 +0x42\nsync.(*WaitGroup).Wait(0xc0000b0580)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/sync/waitgroup.go:130 +0xd4\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Close(0xc0000c60f0, 0xc00009c480, 0xf2dae0)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:103 +0x175\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.TestSaramaConsumerWrapper_start_Messages(0xc00009c480)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer_test.go:165 +0xad4\ntesting.tRunner(0xc00009c480, 0x1067820)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:991 +0x1ec\ncreated by testing.(*T).Run\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/testing/testing.go:1042 +0x661\n\ngoroutine 26 [chan receive]:\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start.func1(0xc0000c60f0)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:79 +0x289\ncreated by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:77 +0x7c\n\ngoroutine 27 [semacquire]:\nsync.runtime_SemacquireMutex(0xc0000c61bc, 0x1100000000, 0x1)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/runtime/sema.go:71 +0x47\nsync.(*Mutex).lockSlow(0xc0000c61b8)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/sync/mutex.go:138 +0x1c1\nsync.(*Mutex).Lock(0xc0000c61b8)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/sync/mutex.go:81 +0x7d\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).handleMessages.func1(0xc0000c60f0, 0x117cb00, 0xc0000a6690, 0xc0000b0580)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:121 +0x9d\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).handleMessages(0xc0000c60f0, 0x117cb00, 0xc0000a6690)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:139 +0xbc5\ncreated by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start.func1\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:92 +0x204\n\ngoroutine 28 [semacquire]:\nsync.runtime_SemacquireMutex(0xc0000c61bc, 0x900000000, 0x1)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/runtime/sema.go:71 +0x47\nsync.(*Mutex).lockSlow(0xc0000c61b8)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/sync/mutex.go:138 +0x1c1\nsync.(*Mutex).Lock(0xc0000c61b8)\n\t/home/travis/.gimme/versions/go1.14.2.linux.amd64/src/sync/mutex.go:81 +0x7d\ngithub.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).handleErrors(0xc0000c60f0, 0x13c, 0xc00008a7e0)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:170 +0x1d3\ncreated by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start.func1\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:93 +0x26b\n"}
github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.newDeadlockDetector.func1
	/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:71
github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*deadlockDetector).start.func1
	/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:163
panic: No messages processed in the last check interval
goroutine 25 [running]:
go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc00011bd90, 0xc000168200, 0x2, 0x2)
	/home/travis/gopath/pkg/mod/go.uber.org/[email protected]/zapcore/entry.go:230 +0x711
go.uber.org/zap.(*Logger).Panic(0xc0000862a0, 0x105563f, 0x30, 0xc000168200, 0x2, 0x2)
	/home/travis/gopath/pkg/mod/go.uber.org/[email protected]/logger.go:225 +0x96
github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.newDeadlockDetector.func1(0xc0ffffffff)
	/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:71 +0x4a5
github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*deadlockDetector).start.func1(0xc0000c6180, 0xc0000b0420)
	/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:163 +0x2bf
created by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*deadlockDetector).start
	/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/deadlock_detector.go:152 +0x346
FAIL	github.com/jaegertracing/jaeger/cmd/ingester/app/consumer	3.026s
FAIL
make: *** [cover] Error 1

@yurishkuro yurishkuro added good first issue Good for beginners help wanted Features that maintainers are willing to accept but do not have cycles to implement labels Oct 9, 2020
@yurishkuro
Copy link
Member Author

@albertteoh @joe-elliott @annanay25 is anyone interested in trying to fix this test? It's been failing close to 50% of the time. I almost rage-t.Skipped it today, but it's kind of a critical test for the module.

@albertteoh
Copy link
Contributor

I can give this a go if there are no takers.

@albertteoh
Copy link
Contributor

albertteoh commented Oct 17, 2020

Key Observations

Success Runs
Analysis of a successful Sarama test run shows the pattern:

  1. Error handler started
  2. Error handler finished
  3. Partition consumer closed
  4. All partitions deadlock detector closed
  5. Parent consumer closed

Example:

2020-10-17T22:46:06.001+1100	INFO	consumer/consumer.go:181	Starting error handler	{"partition": 316}
[...]
2020-10-17T22:46:06.002+1100	INFO	consumer/consumer.go:194	Finished handling errors	{"partition": 316}
[...]
2020-10-17T22:46:06.002+1100	INFO	consumer/consumer.go:177	Closed partition consumer	{"partition": 316}
[...]
2020-10-17T22:46:06.002+1100	DEBUG	consumer/deadlock_detector.go:179	Closing all partitions deadlock detector
2020-10-17T22:46:06.002+1100	INFO	consumer/consumer.go:119	Closing parent consumer

Failed Runs
Analysis of two failed Sarama test runs (here and here) show a common pattern:

  1. Error handler is started
  2. Partition consumer closed
  3. Panic thrown after ~3 seconds of (log) inactivity
2020-05-11T18:49:25.817Z	INFO	consumer/consumer.go:169	Starting error handler	{"partition": 316}
[...]
2020-05-11T18:49:25.819Z	INFO	consumer/consumer.go:165	Closed partition consumer	{"partition": 316}
2020-05-11T18:49:28.815Z	PANIC	consumer/deadlock_detector.go:71	No messages processed in the last check interval...

Questions
For the erroneous case:

  1. Why is there a ~3 second period of log inactivity?
  2. Why was there no Finished handling errors log entry, as well as the other remaining logs of Closing all partitions deadlock detector and Closing parent consumer?

@albertteoh
Copy link
Contributor

albertteoh commented Oct 17, 2020

Root Cause Hypothesis

The following describes the failure case:

  1. Test starts and consumer.Start() is called.

  2. Each partition consumer is initialised with a WaitGroup of 2, presumably one for the message handler and another for the error handler.

  3. A message handler goroutine and an error handler goroutine are both started.

  4. The message handler listens on the message channel. It successfully receives and processes a mock message sent by the test and (defer) calls Done() on the WaitGroup.

  5. Test code initiates a consumer.Close().

  6. partitionMapLock is acquired in consumer.Close(), and commences closing each partition, which Wait()s (blocks) on the WaitGroup after closing the partition. In this case, the WaitGroup counter is 1 since the message handler has called Done() (2 - 1), but the error handler has not called Done() yet.

  7. Meanwhile, consumer.handleErrors() finally attempts to acquire the same partitionMapLock, but fails since it's acquired in step 5. This also means it's unable to proceed to the defer call to wg.Done(), which means the consumer.Close() goroutine can't proceed.

  8. Deadlock.

  9. The deadlock detector ticker ticks when no messages have been consumed and panics, causing the error seen in the tests.

Summary

Step Number Description partitionMapLock State WaitGroup Count
0 Start Unlocked N/A
1 Init WaitGroup Unlocked 2
2 Start message and error handler goroutines Unlocked 2
3 Message handler Done() Unlocked (goes through a number of lock and unlock phases, but nothing prevents it from releasing the lock) 1
4 Test calls consumer.Close() Unlocked 1
5 Test acquires lock and blocks until WaitGroup == 0 Locked 1
6 Error handler tries to acquire lock, fails to call Done() Locked 1
7 Deadlock Locked 1
8 Panic: no messages Locked 1

@albertteoh
Copy link
Contributor

albertteoh commented Oct 17, 2020

Reproducing the error

Place a sleep between the message and error handler go routines to ensure that the test calls consumer.Close() before the error handler starts. That is:

go c.handleMessages(pc)
time.Sleep(100*time.Millisecond)
go c.handleErrors(pc.Partition(), pc.Errors())

@albertteoh
Copy link
Contributor

Proposed Solution

  • Introduce a binary semaphore to the Consumer: errorHandlerStarted *atomic.Bool, which is initialised to false at construction.
  • Set this semaphore to true after we know the error handler has acquired then released the lock and before it starts listening on the error channel.
  • consumer.Close() must wait for errorHandlerStarted to be true before acquiring the partitionMapLock and proceeding with closing partition consumers.

Alternatives Considered

Start Error Handler first and sleep between

That is:

go c.handleErrors(pc.Partition(), pc.Errors())
time.Sleep(100*time.Millisecond)
go c.handleMessages(pc)

Why not:

  • Can't reliably know if the sleep period is sufficiently long to guarantee the error handler has acquired and released the lock.
  • Increases startup time, every time.

Sleep before consumer.Close() in test

That is, place a sleep before the undertest.Close():

time.Sleep(100*time.Millisecond)
undertest.Close()

Why not:

  • As above, can't reliably know if the sleep period is sufficiently long to guarantee the error handler has acquired and released the lock.
  • Other developers will need to know to add this into any other similar tests.
  • Increases test execution time, every time.

@yurishkuro
Copy link
Member Author

Agree that more sleeps won't solve the issue - VMs in Travis are unpredictable and may hang for longer than a reasonable sleep interval. Too long sleeps slow down tests.

It's great that you found a way to reproduce. It actually points to another possible solution - use a single goroutine for listening to all 3 channels (msgs, errors, and deadlock detector). Based on your analysis it sounds like it may remove the race condition.

@albertteoh
Copy link
Contributor

Thanks, @yurishkuro.I think that's a much more elegant solution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug good first issue Good for beginners help wanted Features that maintainers are willing to accept but do not have cycles to implement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants