From 9cbf0f2f310da3fe23bec3ea17e9b1e861627bda Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 5 Dec 2022 14:30:31 +0100 Subject: [PATCH] Load catalogs concurrently When plugin concurrent loading was added (c5ac103ede428edbbe02f6394dc6d38ce4422ee2), the catalogs could not be loaded concurrently easily, because `ConnectorManager.createCatalog` was synchronized. The code has evolved since then, reducing contention significantly and thus allowing for parallel startup. --- .../CoordinatorDynamicCatalogManager.java | 25 ++++++++++++----- .../trino/connector/StaticCatalogManager.java | 27 +++++++++++++------ .../io/trino/testing/LocalQueryRunner.java | 3 ++- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java index 0f34007d58d0a..420b2bb5f8e5a 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java @@ -20,6 +20,7 @@ import io.trino.connector.system.GlobalSystemConnector; import io.trino.metadata.Catalog; import io.trino.metadata.CatalogManager; +import io.trino.server.ForStartup; import io.trino.spi.TrinoException; import javax.annotation.PreDestroy; @@ -31,8 +32,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -42,6 +45,7 @@ import static io.trino.connector.CatalogHandle.createRootCatalogHandle; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.CATALOG_NOT_AVAILABLE; +import static io.trino.util.Executors.executeUntilFailure; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -55,6 +59,7 @@ private enum State { CREATED, INITIALIZED, STOPPED } private final CatalogStore catalogStore; private final CatalogFactory catalogFactory; + private final Executor executor; private final Lock catalogsUpdateLock = new ReentrantLock(); private final ConcurrentMap catalogs = new ConcurrentHashMap<>(); @@ -63,10 +68,11 @@ private enum State { CREATED, INITIALIZED, STOPPED } private State state = State.CREATED; @Inject - public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactory catalogFactory) + public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactory catalogFactory, @ForStartup Executor executor) { this.catalogStore = requireNonNull(catalogStore, "catalogStore is null"); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); + this.executor = requireNonNull(executor, "executor is null"); } @PreDestroy @@ -104,12 +110,17 @@ public void loadInitialCatalogs() checkState(state != State.STOPPED, "ConnectorManager is stopped"); state = State.INITIALIZED; - for (CatalogProperties catalog : catalogStore.getCatalogs()) { - log.info("-- Loading catalog %s --", catalog.getCatalogHandle().getCatalogName()); - CatalogConnector newCatalog = catalogFactory.createCatalog(catalog); - catalogs.put(catalog.getCatalogHandle().getCatalogName(), newCatalog); - log.info("-- Added catalog %s using connector %s --", catalog.getCatalogHandle().getCatalogName(), catalog.getConnectorName()); - } + executeUntilFailure( + executor, + catalogStore.getCatalogs().stream() + .map(catalog -> (Callable) () -> { + log.info("-- Loading catalog %s --", catalog.getCatalogHandle().getCatalogName()); + CatalogConnector newCatalog = catalogFactory.createCatalog(catalog); + catalogs.put(catalog.getCatalogHandle().getCatalogName(), newCatalog); + log.info("-- Added catalog %s using connector %s --", catalog.getCatalogHandle().getCatalogName(), catalog.getConnectorName()); + return null; + }) + .collect(toImmutableList())); } finally { catalogsUpdateLock.unlock(); diff --git a/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java index c522a0b73c73e..ae814523d8f47 100644 --- a/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java @@ -22,6 +22,7 @@ import io.trino.connector.system.GlobalSystemConnector; import io.trino.metadata.Catalog; import io.trino.metadata.CatalogManager; +import io.trino.server.ForStartup; import io.trino.spi.TrinoException; import javax.annotation.PreDestroy; @@ -37,8 +38,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.MoreObjects.firstNonNull; @@ -49,6 +52,7 @@ import static io.trino.connector.CatalogHandle.createRootCatalogHandle; import static io.trino.spi.StandardErrorCode.CATALOG_NOT_AVAILABLE; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.util.Executors.executeUntilFailure; import static java.util.Objects.requireNonNull; @ThreadSafe @@ -61,13 +65,14 @@ private enum State { CREATED, INITIALIZED, STOPPED } private final CatalogFactory catalogFactory; private final List catalogProperties; + private final Executor executor; private final ConcurrentMap catalogs = new ConcurrentHashMap<>(); private final AtomicReference state = new AtomicReference<>(State.CREATED); @Inject - public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerConfig config) + public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerConfig config, @ForStartup Executor executor) { this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); List disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()); @@ -95,6 +100,7 @@ public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerC catalogProperties.add(new CatalogProperties(createRootCatalogHandle(catalogName), connectorName, ImmutableMap.copyOf(properties))); } this.catalogProperties = catalogProperties.build(); + this.executor = requireNonNull(executor, "executor is null"); } private static List listCatalogFiles(File catalogsDirectory) @@ -133,13 +139,18 @@ public void loadInitialCatalogs() return; } - for (CatalogProperties catalog : catalogProperties) { - String catalogName = catalog.getCatalogHandle().getCatalogName(); - log.info("-- Loading catalog %s --", catalogName); - CatalogConnector newCatalog = catalogFactory.createCatalog(catalog); - catalogs.put(catalogName, newCatalog); - log.info("-- Added catalog %s using connector %s --", catalogName, catalog.getConnectorName()); - } + executeUntilFailure( + executor, + catalogProperties.stream() + .map(catalog -> (Callable) () -> { + String catalogName = catalog.getCatalogHandle().getCatalogName(); + log.info("-- Loading catalog %s --", catalogName); + CatalogConnector newCatalog = catalogFactory.createCatalog(catalog); + catalogs.put(catalogName, newCatalog); + log.info("-- Added catalog %s using connector %s --", catalogName, catalog.getConnectorName()); + return null; + }) + .collect(toImmutableList())); } @Override diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index 35a542c1f3584..4a38d748263b6 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -217,6 +217,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.connector.CatalogServiceProviderModule.createAccessControlProvider; @@ -357,7 +358,7 @@ private LocalQueryRunner( this.optimizerConfig = new OptimizerConfig(); LazyCatalogFactory catalogFactory = new LazyCatalogFactory(); this.catalogFactory = catalogFactory; - this.catalogManager = new CoordinatorDynamicCatalogManager(NO_STORED_CATALOGS, catalogFactory); + this.catalogManager = new CoordinatorDynamicCatalogManager(NO_STORED_CATALOGS, catalogFactory, directExecutor()); this.transactionManager = InMemoryTransactionManager.create( new TransactionManagerConfig().setIdleTimeout(new Duration(1, TimeUnit.DAYS)), yieldExecutor,