A kafka' pipeline framework build on to of kafka-go, aim to help build kafka batch processors, including at-most once
, at-least once
, ~exactly once
, graceful shutdown
, readiness probe
, logger
, jaeger tracer
, prometheus metric
, concurrency
, partition concurrent
, key concurrent
, freeze partition on failed
.
go get github.com/sellsuki/kafka-go-worker
- Start required software (kafka, jaeger)
docker-compose -f ./example/docker-compose.yml up -d
- kafka
localhost:9090
- jaeger web http://localhost:16686
- kafka
- Run seed kafka topic and message
go test -v -run Test_Seed_Kafka ./example
- Stop current example
- Delete the Consumer group (default:
worker_name
)- Or re-seed data into topic again
Received message in batch, process message 1 by 1 until all messages processed then commit ALL message in batch if some messages failed it still got COMMITTED Use case generic kafka pipeline, need handle failed message later without blocking the stream
- Order: yes
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: Poor
- Idempotent Worker: Required
- UseCase: Generic kafka worker and single core ?
./example/example_1_simple_worker_test.go
go test -v -run Test_Example_1 ./example
Similar to Example1 But included graceful shutdown, and readiness probe
./example/example_2_simple_worker_with_manager_test.go
go test -v -run Test_Example_2 ./example
Received message in batch, fork each partition into separate thread, Each partition will process message in serial (ordered), and commit once per partition,Use case similar to Example 1, but have better process speed, due to concurrency
- Order: yes (Partition)
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: Good
- Idempotent Worker: Required
- UseCase: Generic kafka worker, limited by partition number
./example/example_3_partition_concurrent_test.go
go test -v -run Test_Example_3 ./example
Received message in batch, process all message at the same time, once all message processed, commit all messages.
- Order: NO
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: Best
- Idempotent Worker: Required
- UseCase: Process that want to complete as fast as possible, and don't need to be ORDERED, e.g. Logging, Broadcasting
./example/example_4_full_parallel_test.go
go test -v -run Test_Example_4 ./example
Received message in batch, fork each KEY
into separate thread, Each KEY
will process message in serial (ordered), and commit once per partition, Use case similar to Example 1, but have better process speed, due to concurrency
- Order: yes (Message.Key)
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: Good+
- Idempotent Worker: Required
- UseCase: Generic kafka worker, limited by partition number
./example/example_5_key_concurrent_test.go
go test -v -run Test_Example_5 ./example
Worker level tracing
- logging (https://github.com/uber-go/zap)
- tracing (https://github.com/open-telemetry/opentelemetry-go)
- metric (https://github.com/prometheus/client_golang)
./example/example_6_worker_logger_tracer_metric_test.go
go test -v -run Test_Example_6 ./example
Batch + Worker level tracing
- logging (https://github.com/uber-go/zap)
- tracing (https://github.com/open-telemetry/opentelemetry-go)
- metric (https://github.com/prometheus/client_golang)
./example/example_7_batch_logger_tracer_metric_test.go
go test -v -run Test_Example_7 ./example
Fork all message into each thread, but limit maximum concurrent to 2
workers
- Order: NO
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: Good++
- Idempotent Worker: Required
- UseCase: Parallel worker that have resource constraint
./example/example_8_concurrent_limiter_test.go
go test -v -run Test_Example_8 ./example
Only allow 1
partition process at a time and limit workers to 3
(by key)
- Order: yes (Message.Key)
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: Good+
- Idempotent Worker: Required
- UseCase: Parallel Key worker that have resource constraint
./example/example_9_concurrent_limiter_partition_test.go
go test -v -run Test_Example_9 ./example
Only allow 1
partition process at a time, then commit 1 message at a time
- Order: yes (Message.Key)
- Worker Process Failed: handle it manually later (logging error, then reload all message since error, or fix data directly ?)
- Speed: POOR-
- Idempotent Worker: No need (worker use database transaction)
- UseCase: Job that need to process once and cannot be
reverse
orcompensate
./example/example_10_exactly_once_worker_test.go
go test -v -run Test_Example_10 ./example
Process message in partition, if any of message in partition failed to process, worker will freeze that partition, and stop process the partition
- Order: yes (Message.Key)
- Worker Process Failed: The partition that cause the error will not process anymore, resume by update worker code to handle that error case, or skip that error message
- Speed: Good+ | will halt completely if anything error in that partition
- Idempotent Worker: Required
- UseCase: Job that order are required and cannot process if any messages are missing (Stateful), e.g. Bank transfer transaction summary
./example/example_11_stall_parition_on_failed_test.go
go test -v -run Test_Example_11 ./example