Skip to content

Commit

Permalink
Merge branch 'master' into bewaremypower/concurrent-openhashmap-topic
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Sep 21, 2024
2 parents 1fb7d38 + f5c1ad2 commit 2b2cfc1
Show file tree
Hide file tree
Showing 63 changed files with 3,849 additions and 1,558 deletions.
9 changes: 9 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,15 @@ loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
# (only used in load balancer extension logics)
loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600

# Name of ServiceUnitStateTableView implementation class to use
loadManagerServiceUnitStateTableViewClassName=org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl

# Specify ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and
# system topic table views during migration from one to the other. One could enable this
# syncer before migration and disable it after the migration finishes.
# It accepts `MetadataStoreToSystemTopicSyncer` or `SystemTopicToMetadataStoreSyncer` to
# enable it. It accepts `None` to disable it."
loadBalancerServiceUnitTableViewSyncer=None

### --- Replication --- ###

Expand Down
57 changes: 8 additions & 49 deletions pip/pip-378.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Add `ServiceUnitStateTableView` abstraction and make it pluggable, so users can
- Introduce `MetadataStoreTableView` interface to support `ServiceUnitStateMetadataStoreTableViewImpl` implementation.
- `MetadataStoreTableViewImpl` will use shadow hashmap to maintain the metadata tableview. It will initially fill the local tableview by scanning all existing items in the metadata store path. Also, new items will be updated to the tableview via metadata watch notifications.
- Add `BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer` in MetadataCacheConfig to listen the automatic cache async reload. This can be useful to re-sync the the shadow hashmap in MetadataStoreTableViewImpl in case it is out-dated in the worst case(e.g. network or metadata issues).
- Introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views to migrate to one from the other. This syncer can be enabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`.
- Introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views to migrate to one from the other. This syncer can be enabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncer`.

## Detailed Design

Expand Down Expand Up @@ -243,56 +243,15 @@ public class MetadataCacheConfig<T> {
*/
@Slf4j
public class ServiceUnitStateTableViewSyncer implements Cloneable {
private static final int SYNC_TIMEOUT_IN_SECS = 30;
private volatile ServiceUnitStateTableView systemTopicTableView;
private volatile ServiceUnitStateTableView metadataStoreTableView;

...

public void start(PulsarService pulsar) throws IOException {
if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
return;
}
try {
if (systemTopicTableView == null) {
systemTopicTableView = new ServiceUnitStateTableViewImpl();
systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
this::syncToMetadataStore);
log.info("Successfully started ServiceUnitStateTableViewSyncer::systemTopicTableView");
}

if (metadataStoreTableView == null) {
metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
this::syncToSystemTopic);
log.info("Successfully started ServiceUnitStateTableViewSyncer::metadataStoreTableView");
}

} catch (Throwable e) {
log.error("Failed to start ServiceUnitStateTableViewSyncer", e);
throw e;
}
... // sync SystemTopicTableView and MetadataStoreTableView
}

private void syncToSystemTopic(String key, ServiceUnitStateData data) {
try {
systemTopicTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("SystemTopicTableView failed to sync key:{}, data:{}", key, data, e);
throw new IllegalStateException(e);
}
}

private void syncToMetadataStore(String key, ServiceUnitStateData data) {
try {
metadataStoreTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("metadataStoreTableView failed to sync key:{}, data:{}", key, data, e);
throw new IllegalStateException(e);
}
public void close() throws IOException {
... // stop syncer
}
...
}
Expand All @@ -302,14 +261,14 @@ public class ServiceUnitStateTableViewSyncer implements Cloneable {

### Configuration

- Add a `loadManagerServiceUnitStateTableViewClassName` configuration to specify `ServiceUnitStateTableView` implementation class name.
- Add a `loadBalancerServiceUnitTableViewSyncerEnabled` configuration to to enable ServiceUnitTableViewSyncer to sync metadata store and system topic ServiceUnitStateTableView during migration.
- Add a `loadManagerServiceUnitStateTableViewClassName` static configuration to specify `ServiceUnitStateTableView` implementation class name.
- Add a `loadBalancerServiceUnitTableViewSyncer` dynamic configuration to enable ServiceUnitTableViewSyncer to sync metadata store and system topic ServiceUnitStateTableView during migration.

## Backward & Forward Compatibility

It will ba Backward & Forward compatible as `loadManagerServiceUnitStateTableViewClassName` will be `ServiceUnitStateTableViewImpl`(system topic implementation) by default.

We will introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views when migrating to ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl and vice versa. This syncer can be enabled/disabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`. The admin could enable this syncer before migration and disable it after it is finished.
We will introduce `ServiceUnitStateTableViewSyncer` dynamic config to sync system topic and metadata store table views when migrating to ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl and vice versa. The admin could enable this syncer before migration and disable it after it is finished.

## Alternatives

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,25 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean loadBalancerMultiPhaseBundleUnload = true;

@FieldContext(
dynamic = false,
category = CATEGORY_LOAD_BALANCER,
doc = "Name of ServiceUnitStateTableView implementation class to use"
)
private String loadManagerServiceUnitStateTableViewClassName =
"org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl";

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Specify ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and "
+ "system topic table views during migration from one to the other. One could enable this"
+ " syncer before migration and disable it after the migration finishes. "
+ "It accepts `MetadataStoreToSystemTopicSyncer` or `SystemTopicToMetadataStoreSyncer` to "
+ "enable it. It accepts `None` to disable it."
)
private ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = ServiceUnitTableViewSyncerType.None;

/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down Expand Up @@ -3810,4 +3829,14 @@ public Map<String, String> lookupProperties() {
});
return map;
}

public boolean isLoadBalancerServiceUnitTableViewSyncerEnabled() {
return loadBalancerServiceUnitTableViewSyncer != ServiceUnitTableViewSyncerType.None;
}

public enum ServiceUnitTableViewSyncerType {
None,
MetadataStoreToSystemTopicSyncer,
SystemTopicToMetadataStoreSyncer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -478,18 +477,17 @@ public void getListFromBundle(
} else {
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true)
.thenAccept(nsBundle -> {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
pulsar().getBrokerService()
.getMultiLayerTopicsMap().get(namespaceName.toString());
final var bundleTopics = pulsar().getBrokerService().getMultiLayerTopicsMap()
.get(namespaceName.toString());
if (bundleTopics == null || bundleTopics.isEmpty()) {
asyncResponse.resume(Collections.emptyList());
return;
}
final List<String> topicList = new ArrayList<>();
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
final var topicMap = bundleTopics.get(bundleKey);
if (topicMap != null) {
topicList.addAll(topicMap.keys().stream()
topicList.addAll(topicMap.keySet().stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList()));
}
Expand Down
Loading

0 comments on commit 2b2cfc1

Please sign in to comment.