Skip to content

Commit

Permalink
[fix][broker] Added the skipped message handler for ServiceUnitStateC…
Browse files Browse the repository at this point in the history
…hannel (apache#20677)

### Motivation

When topic lookups race(and the first lookup returns too fast before the second request gets deduped), the later lookup requests can timeout because the skipped messages are ignored at the table view and do not notify the service unit state channel.

ex:

m1: assign to b1 -> m2: owned by b1 -> m3: assign to b2 // m3 is skipped at the tableview

When m3 is skipped, we better get the channel notified to return the lookup request with the current owner(b1) instead of letting the request time out.

### Modifications

- Added the `handleSkippedMessage` and `setSkippedMsgHandler` in `TopicCompactionStrategy`. 
- `ServiceUnitStateChannel` registers `ServiceUnitStateChannel.handleSkippedEvent` to `ServiceUnitStateCompactionStrategy` by `setSkippedMsgHandler`.
- TableView calls `TopicCompactionStrategy.handleSkippedMessage` when messages are skipped.
  • Loading branch information
heesung-sn committed Jun 30, 2023
1 parent d718687 commit 8d6b931
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -94,6 +95,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -321,6 +323,13 @@ public synchronized void start() throws PulsarServerException {
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
tableview.listen((key, value) -> handle(key, value));
var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG);
if (strategy == null) {
String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null.";
log.error(err);
throw new IllegalStateException(err);
}
strategy.setSkippedMsgHandler((key, value) -> handleSkippedEvent(key));
if (debug) {
log.info("Successfully started the channel tableview.");
}
Expand Down Expand Up @@ -695,6 +704,18 @@ lookupServiceAddress, getLogEventTag(data), serviceUnit,
}
}

private void handleSkippedEvent(String serviceUnit) {
var getOwnerRequest = getOwnerRequests.get(serviceUnit);
if (getOwnerRequest != null) {
var data = tableview.get(serviceUnit);
if (data.state() == Owned) {
getOwnerRequest.complete(data.dstBroker());
getOwnerRequests.remove(serviceUnit);
stateChangeListeners.notify(serviceUnit, data, null);
}
}
}

private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,33 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import com.google.common.annotations.VisibleForTesting;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;

public class ServiceUnitStateCompactionStrategy implements TopicCompactionStrategy<ServiceUnitStateData> {

private final Schema<ServiceUnitStateData> schema;
private BiConsumer<String, ServiceUnitStateData> skippedMsgHandler;

private boolean checkBrokers = true;

public ServiceUnitStateCompactionStrategy() {
schema = Schema.JSON(ServiceUnitStateData.class);
}

public void setSkippedMsgHandler(BiConsumer<String, ServiceUnitStateData> skippedMsgHandler) {
this.skippedMsgHandler = skippedMsgHandler;
}

@Override
public void handleSkippedMessage(String key, ServiceUnitStateData cur) {
if (skippedMsgHandler != null) {
skippedMsgHandler.accept(key, cur);
}
}

@Override
public Schema<ServiceUnitStateData> getSchema() {
return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -199,7 +200,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(priority = 0)
@Test(priority = -1)
public void channelOwnerTest() throws Exception {
var channelOwner1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
var channelOwner2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
Expand Down Expand Up @@ -947,8 +948,7 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx
} catch (CompletionException e) {
ex = e;
}
assertNotNull(ex);
assertEquals(TimeoutException.class, ex.getCause().getClass());
assertNull(ex);
assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(bundle).get());
assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(bundle).get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public boolean shouldKeepLeft(byte[] prev, byte[] cur) {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testLoadInvalidTopicCompactionStrategy() {
TopicCompactionStrategy.load("uknown");
TopicCompactionStrategy.load("uknown", "uknown");
}

@Test
public void testNumericOrderCompactionStrategy() {
TopicCompactionStrategy<Integer> strategy =
TopicCompactionStrategy.load(NumericOrderCompactionStrategy.class.getCanonicalName());
TopicCompactionStrategy.load("numeric", NumericOrderCompactionStrategy.class.getCanonicalName());
Assert.assertFalse(strategy.shouldKeepLeft(1, 2));
Assert.assertTrue(strategy.shouldKeepLeft(2, 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -65,7 +66,8 @@ public class TableViewImpl<T> implements TableView<T> {
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategyClassName());
this.compactionStrategy =
TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName());
ReaderBuilder<T> readerBuilder = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
Expand Down Expand Up @@ -198,6 +200,7 @@ private void handleMessage(Message<T> msg) {
key,
cur,
prev);
compactionStrategy.handleSkippedMessage(key, cur);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.topics;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Schema;

/**
Expand Down Expand Up @@ -45,6 +47,9 @@
*/
public interface TopicCompactionStrategy<T> {

String TABLE_VIEW_TAG = "table-view";
Map<String, TopicCompactionStrategy> INSTANCES = new ConcurrentHashMap<>();

/**
* Returns the schema object for this strategy.
* @return
Expand All @@ -60,17 +65,27 @@ public interface TopicCompactionStrategy<T> {
*/
boolean shouldKeepLeft(T prev, T cur);

static TopicCompactionStrategy load(String topicCompactionStrategyClassName) {
default void handleSkippedMessage(String key, T cur) {
}


static TopicCompactionStrategy load(String tag, String topicCompactionStrategyClassName) {
if (topicCompactionStrategyClassName == null) {
return null;
}

try {
Class<?> clazz = Class.forName(topicCompactionStrategyClassName);
Object instance = clazz.getDeclaredConstructor().newInstance();
return (TopicCompactionStrategy) instance;
TopicCompactionStrategy instance = (TopicCompactionStrategy) clazz.getDeclaredConstructor().newInstance();
INSTANCES.put(tag, instance);
return instance;
} catch (Exception e) {
throw new IllegalArgumentException(
"Error when loading topic compaction strategy: " + topicCompactionStrategyClassName, e);
}
}

static TopicCompactionStrategy getInstance(String tag) {
return INSTANCES.get(tag);
}
}

0 comments on commit 8d6b931

Please sign in to comment.