Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix wrong offset in send callback #335

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public void completed(Exception exception, long ledgerId, long entryId) {

topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.MICROSECONDS);

offsetFuture.complete(MessageIdUtils.getCurrentOffset(managedLedger));
// In asynchronously send mode, there's a possibility that some messages' offsets are larger than the actual
// offsets. For example, two messages are sent concurrently, if the 1st message is completed while the 2nd
// message is already persisted, `MessageIdUtils.getCurrentOffset` will return the 2nd message's offset.
final long baseOffset = MessageIdUtils.getCurrentOffset(managedLedger) - (numberOfMessages - 1);
offsetFuture.complete(baseOffset);
}

recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -56,6 +60,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
Expand Down Expand Up @@ -445,25 +450,28 @@ public void testProduceCallback() throws Exception {
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);

final CountDownLatch latch = new CountDownLatch(numMessages);
final List<Long> offsets = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
final int index = i;
producer.send(new ProducerRecord<>(topic, i, messagePrefix + i), (recordMetadata, e) -> {
if (e != null) {
log.error("Failed to send {}: {}", index, e);
fail("Failed to send " + index + ": " + e.getMessage());
}
if (log.isDebugEnabled()) {
log.info("Send {} to offset {}", index, recordMetadata.offset());
}
assertEquals(recordMetadata.topic(), topic);
assertEquals(recordMetadata.partition(), 0);
assertEquals(recordMetadata.offset(), index);
latch.countDown();
}).get();
// TODO: asynchronous send with batching enabled can't pass the test, see
// https://github.com/streamnative/kop/issues/332
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, i, messagePrefix + i),
(recordMetadata, e) -> {
if (e != null) {
log.error("Failed to send {}: {}", index, e);
offsets.add(-1L);
} else {
offsets.add(recordMetadata.offset());
}
latch.countDown();
});
// The first half messages are sent in batch, the second half messages are sent synchronously.
if (i >= numMessages / 2) {
future.get();
}
}
latch.await();
final List<Long> expectedOffsets = LongStream.range(0, numMessages).boxed().collect(Collectors.toList());
log.info("Actual offsets: {}", offsets);
assertEquals(offsets, expectedOffsets);
}

@Test(timeOut = 10000)
Expand Down