Skip to content

Commit

Permalink
[fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 5, 2024
1 parent 342d88d commit aece67e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();

public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
static MetadataStore getMetadataStore(Configuration conf) throws MetadataException {
MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty != null) {
Expand Down Expand Up @@ -116,12 +116,20 @@ public synchronized void setConf(Configuration conf) {
super.setConf(conf);
MetadataStore store;
try {
store = createMetadataStore(conf);
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
store = getMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}

bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);

try {
var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
.get();

for (var bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
}
Expand All @@ -131,10 +139,12 @@ public synchronized void setConf(Configuration conf) {
}
}
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to update rack info. ", e);
throw new RuntimeException(e);
}

watchAvailableBookies();
}

private void watchAvailableBookies() {
Expand All @@ -145,13 +155,13 @@ private void watchAvailableBookies() {
field.setAccessible(true);
RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver);
registrationClient.watchWritableBookies(versioned -> {
try {
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to update rack info. ", e);
}
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
.thenAccept(this::updateRacksWithHost)
.exceptionally(ex -> {
LOG.error("Failed to update rack info. ", ex);
return null;
});
});
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Failed watch available bookies.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
MetadataStore store;
try {
store = BookieRackAffinityMapping.createMetadataStore(conf);
store = BookieRackAffinityMapping.getMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
}
Expand Down

0 comments on commit aece67e

Please sign in to comment.