Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Inject remote store in IndexShard instead of RemoteStoreRefreshListener #3703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -189,9 +187,9 @@ public final class IndexModule {
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.opensearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param directoryFactories the available store types
*/
public IndexModule(
Expand Down Expand Up @@ -476,7 +474,8 @@ public IndexService newIndexService(
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry
ValuesSourceRegistry valuesSourceRegistry,
RemoteDirectoryFactory remoteDirectoryFactory
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -519,7 +518,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
remoteDirectoryFactory,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
37 changes: 17 additions & 20 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -97,9 +96,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -437,8 +433,7 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -511,22 +506,24 @@ public synchronized IndexShard createShard(
warmer.warm(reader, shard, IndexService.this.indexSettings);
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;

Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(
clusterService.state().metadata().clusterUUID(),
this.indexSettings,
path
);
remoteStore = new Store(
shardId,
this.indexSettings,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -557,7 +554,7 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStoreRefreshListener
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
26 changes: 19 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -305,7 +307,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final RemoteStoreRefreshListener remoteStoreRefreshListener;
private final Store remoteStore;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -329,7 +331,7 @@ public IndexShard(
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener
@Nullable final Store remoteStore
Copy link
Collaborator

@Bukhtawar Bukhtawar Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the Store should be abstracted as a CompositeStore that has data backed in remote. IndexShard shouldn't worry about exposing or maintaining another Store that is remote

class CompositeStore extends Store
{ 
  private Store localStore;
  private Store remoteStore, 
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need some time to think about this abstraction. Tracking here: #3719

) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -417,7 +419,7 @@ public boolean shouldCache(Query query) {
} else {
this.checkpointRefreshListener = null;
}
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
this.remoteStore = remoteStore;
}

public ThreadPool getThreadPool() {
Expand All @@ -428,6 +430,10 @@ public Store store() {
return this.store;
}

public Store remoteStore() {
return this.remoteStore;
}

/**
* Return the sort order of this index, or null if the index has no sort.
*/
Expand Down Expand Up @@ -1638,7 +1644,8 @@ public void close(String reason, boolean flushEngine) throws IOException {
} finally {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
// Closing remoteStore as a part of IndexShard close. null check is handled by IOUtils
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, remoteStore);
indexShardOperationPermits.close();
}
}
Expand Down Expand Up @@ -3192,7 +3199,7 @@ private DocumentMapperForType docMapper() {
return mapperService.documentMapperWithAutoCreate();
}

private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) throws IOException {
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
Expand All @@ -3204,8 +3211,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener.add(remoteStoreRefreshListener);
if (isRemoteStoreEnabled()) {
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
Comment on lines +3214 to +3216
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I couldn't find where are we handling the reference counting for Store. The call to directory() requires the Store to be Open(ref counter > 0) so this call would fail?

public Directory directory() {
ensureOpen();
return directory;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When store is created, default refCount is 1.

/**
* A basic RefCounted implementation that is initialized with a
* ref count of 1 and calls {@link #closeInternal()} once it reaches
* a 0 ref count
*/
public abstract class AbstractRefCounted implements RefCounted {

Copy link
Collaborator

@Bukhtawar Bukhtawar Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sachinpkale so when should Store be closed for remote store as in reference count decremented or explicitly closed? I see IndexShard#closeShard closing the local store, do we need to handle close for remote store as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, we are not closing remoteStore. Let me add remoteStore close as a part of IndexShard.close()

Copy link
Collaborator

@Bukhtawar Bukhtawar Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local store we have ShardStoreDeleter as a listener that cleans up data, for remote we are deleting it as a part of close. Also when a shard relocates to another node, the shard engine I believe closes, would that mean we are removing data from the remote? I guess that would be no, but please verify if that could happen

Copy link
Collaborator

@Bukhtawar Bukhtawar Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just have a feeling we might be leaking the Store on abrupt Engine failures?

/**
* Closes the engine without acquiring the write lock. This should only be
* called while the write lock is hold or in a disaster condition ie. if the engine
* is failed.
*/
@Override
protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
this.versionMap.clear();
if (internalReaderManager != null) {
internalReaderManager.removeListener(versionMap);
}
try {
IOUtils.close(externalReaderManager, internalReaderManager);
} catch (Exception e) {
logger.warn("Failed to close ReaderManager", e);
}
try {
IOUtils.close(translogManager.getTranslog());
} catch (Exception e) {
logger.warn("Failed to close translog", e);
}
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
indexWriter.rollback();
} catch (AlreadyClosedException ex) {
failOnTragicEvent(ex);
throw ex;
}
logger.trace("rollback indexWriter done");
} catch (Exception e) {
logger.warn("failed to rollback writer on close", e);
} finally {
try {
store.decRef();
logger.debug("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}

Can we add assertions in the tests to ensure the state of the remote store?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should not delete data on close. I think I have removed the blobcontainer.delete() as a part of some other commit which is not included in this PR. Let me add that.

I did not get the store leaking problem though. Are you talking about store or remoteStore?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a case of abrupt Engine failure should we close the remote store similar to how a local store gets closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But internal engine is not aware of the remoteStore. Its scope is limited to IndexShard.

}
if (this.checkpointRefreshListener != null) {
internalRefreshListener.add(checkpointRefreshListener);
Expand Down Expand Up @@ -3238,6 +3246,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
);
}

private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}

/**
* Closes the directory by deleting all the files in this directory
* Closes the remote directory. Currently, it is a no-op.
* If remote directory maintains a state in future, we need to clean it before closing the directory
*/
@Override
public void close() throws IOException {
blobContainer.delete();
// Do nothing
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Factory for a remote store directory
Expand All @@ -26,12 +29,23 @@
*/
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {

private final Supplier<RepositoriesService> repositoriesService;

public RemoteDirectoryFactory(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException {
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -265,6 +266,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final RemoteDirectoryFactory remoteDirectoryFactory;

@Override
protected void doStart() {
Expand Down Expand Up @@ -292,7 +294,8 @@ public IndicesService(
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
RemoteDirectoryFactory remoteDirectoryFactory
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -386,6 +389,7 @@ protected void closeInternal() {

this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
this.remoteDirectoryFactory = remoteDirectoryFactory;
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand Down Expand Up @@ -745,7 +749,8 @@ private synchronized IndexService createIndexService(
indicesFieldDataCache,
namedWriteableRegistry,
this::isIdFieldDataEnabled,
valuesSourceRegistry
valuesSourceRegistry,
remoteDirectoryFactory
);
}

Expand Down Expand Up @@ -859,13 +864,7 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(
shardRouting,
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
repositoriesService
);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
import org.opensearch.Build;
Expand Down Expand Up @@ -622,6 +623,8 @@ protected Node(
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);

final RemoteDirectoryFactory remoteDirectoryFactory = new RemoteDirectoryFactory(repositoriesServiceReference::get);

final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
Expand All @@ -642,7 +645,8 @@ protected Node(
engineFactoryProviders,
indexStoreFactories,
searchModule.getValuesSourceRegistry(),
recoveryStateFactories
recoveryStateFactories,
remoteDirectoryFactory
);

final AliasValidator aliasValidator = new AliasValidator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.Repository;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -74,13 +73,13 @@ interface DirectoryFactory {
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param repositoryName repository name
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param repository to get the BlobContainer details
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
Expand Down
Loading