From 1223160e199cc93f3bb1084e5c1712e0845f592c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 4 Jan 2022 11:05:58 +0000 Subject: [PATCH] Paginate persisted cluster state (#78875) Today we allocate a contiguous chunk of memory for the global metadata each time we write it to disk. The size of this chunk is unbounded and in practice it can be pretty large. This commit splits the metadata document up into pages (1MB by default) that are streamed to disk at write time, bounding the memory usage of cluster state persistence. Since the memory usage is now bounded we can allocate a single buffer up front and re-use it for every write. --- .../ElasticsearchNodeCommand.java | 2 - .../bytes/RecyclingBytesStreamOutput.java | 137 ----- .../common/settings/ClusterSettings.java | 1 + .../gateway/PersistedClusterStateService.java | 552 +++++++++++------- .../java/org/elasticsearch/node/Node.java | 5 +- .../RecyclingBytesStreamOutputTests.java | 69 --- .../env/NodeRepurposeCommandTests.java | 2 - .../env/OverrideNodeVersionCommandTests.java | 3 - .../GatewayMetaStatePersistedStateTests.java | 170 +++--- .../PersistedClusterStateServiceTests.java | 172 +++++- .../RemoveCorruptedShardDataCommandTests.java | 2 - .../AbstractCoordinatorTestCase.java | 17 +- .../gateway/MockGatewayMetaState.java | 6 +- .../test/InternalTestCluster.java | 7 + 14 files changed, 615 insertions(+), 530 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java delete mode 100644 server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java index c7a93007e979b..49a6b7a510d3a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -112,7 +111,6 @@ public static PersistedClusterStateService createPersistedClusterStateService(Se dataPaths, nodeId, namedXContentRegistry, - BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); diff --git a/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java deleted file mode 100644 index 39ca7ba02fb28..0000000000000 --- a/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.common.bytes; - -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.common.io.stream.BytesStream; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.ByteArray; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.internal.io.IOUtils; - -import java.io.IOException; -import java.util.Objects; - -/** - * An in-memory {@link StreamOutput} which first fills the given {@code byte[]} and then allocates more space from the given - * {@link BigArrays} if needed. The idea is that you can use this for passing data to an API that requires a single {@code byte[]} (or a - * {@link org.apache.lucene.util.BytesRef}) which you'd prefer to re-use if possible, avoiding excessive allocations, but which may not - * always be large enough. - */ -public class RecyclingBytesStreamOutput extends BytesStream { - - private final byte[] buffer; - private final BigArrays bigArrays; - - private int position; - - @Nullable // if buffer is large enough - private ByteArray overflow; - - public RecyclingBytesStreamOutput(byte[] buffer, BigArrays bigArrays) { - this.buffer = Objects.requireNonNull(buffer); - this.bigArrays = Objects.requireNonNull(bigArrays); - } - - @Override - public void writeByte(byte b) { - if (position < buffer.length) { - buffer[position++] = b; - } else { - ensureCapacity(position + 1); - overflow.set(position++ - buffer.length, b); - } - } - - private void ensureCapacity(int size) { - final int overflowSize = size - buffer.length; - assert overflowSize > 0 : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "]"; - assert position >= buffer.length - : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "] at position [" + position + "]"; - if (overflow == null) { - overflow = bigArrays.newByteArray(overflowSize, false); - } else if (overflowSize > overflow.size()) { - overflow = bigArrays.resize(overflow, overflowSize); - } - assert overflow.size() >= overflowSize; - } - - @Override - public void writeBytes(byte[] b, int offset, int length) { - if (position < buffer.length) { - final int lengthForBuffer = Math.min(length, buffer.length - position); - System.arraycopy(b, offset, buffer, position, lengthForBuffer); - position += lengthForBuffer; - offset += lengthForBuffer; - length -= lengthForBuffer; - } - - if (length > 0) { - ensureCapacity(position + length); - overflow.set(position - buffer.length, b, offset, length); - position += length; - } - } - - @Override - public void flush() {} - - @Override - public void close() throws IOException { - IOUtils.close(overflow); - } - - @Override - public void reset() throws IOException { - throw new UnsupportedOperationException(); - } - - /** - * Return the written bytes in a {@link BytesRef}, avoiding allocating a new {@code byte[]} if the original buffer was already large - * enough. If we allocate a new (larger) buffer here then callers should typically re-use it for subsequent streams. - */ - public BytesRef toBytesRef() { - if (position <= buffer.length) { - assert overflow == null; - return new BytesRef(buffer, 0, position); - } - - final byte[] newBuffer = new byte[position]; - System.arraycopy(buffer, 0, newBuffer, 0, buffer.length); - int copyPos = buffer.length; - final BytesRefIterator iterator = BytesReference.fromByteArray(overflow, position - buffer.length).iterator(); - BytesRef bytesRef; - try { - while ((bytesRef = iterator.next()) != null) { - assert copyPos + bytesRef.length <= position; - System.arraycopy(bytesRef.bytes, bytesRef.offset, newBuffer, copyPos, bytesRef.length); - copyPos += bytesRef.length; - } - } catch (IOException e) { - throw new AssertionError("impossible", e); - } - - return new BytesRef(newBuffer, 0, position); - } - - @Override - public BytesReference bytes() { - if (position <= buffer.length) { - assert overflow == null; - return new BytesArray(buffer, 0, position); - } else { - return CompositeBytesReference.of( - new BytesArray(buffer, 0, buffer.length), - BytesReference.fromByteArray(overflow, position - buffer.length) - ); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e779e279b7b02..e0f0614aa2791 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -239,6 +239,7 @@ public void apply(Settings value, Settings current, Settings previous) { GatewayService.RECOVER_AFTER_DATA_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, + PersistedClusterStateService.DOCUMENT_PAGE_SIZE, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, NetworkModule.HTTP_TYPE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 6423befa58c00..0fd76382b46a7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -38,44 +38,43 @@ import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.ByteArray; -import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetadata; -import org.elasticsearch.index.Index; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; import java.io.Closeable; import java.io.IOError; import java.io.IOException; +import java.io.OutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -96,12 +95,12 @@ * to record the last-accepted cluster state during publication. The metadata is written incrementally where possible, leaving alone any * documents that have not changed. The index has the following fields: * - * +------------------------------+-----------------------------+----------------------------------------------+ - * | "type" (string field) | "index_uuid" (string field) | "data" (stored binary field in SMILE format) | - * +------------------------------+-----------------------------+----------------------------------------------+ - * | GLOBAL_TYPE_NAME == "global" | (omitted) | Global metadata | - * | INDEX_TYPE_NAME == "index" | Index UUID | Index metadata | - * +------------------------------+-----------------------------+----------------------------------------------+ + * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ + * | "type" (string field) | "index_uuid" (string field) | "data" (stored binary field in SMILE format) | "page" | "last_page" | + * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ + * | GLOBAL_TYPE_NAME == "global" | (omitted) | Global metadata | large docs are | + * | INDEX_TYPE_NAME == "index" | Index UUID | Index metadata | split into pages | + * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ * * Additionally each commit has the following user data: * @@ -122,11 +121,15 @@ public class PersistedClusterStateService { private static final String LAST_ACCEPTED_VERSION_KEY = "last_accepted_version"; private static final String NODE_ID_KEY = "node_id"; private static final String NODE_VERSION_KEY = "node_version"; - private static final String TYPE_FIELD_NAME = "type"; + public static final String TYPE_FIELD_NAME = "type"; + public static final String GLOBAL_TYPE_NAME = "global"; + public static final String INDEX_TYPE_NAME = "index"; private static final String DATA_FIELD_NAME = "data"; - private static final String GLOBAL_TYPE_NAME = "global"; - private static final String INDEX_TYPE_NAME = "index"; private static final String INDEX_UUID_FIELD_NAME = "index_uuid"; + public static final String PAGE_FIELD_NAME = "page"; + public static final String LAST_PAGE_FIELD_NAME = "last_page"; + public static final int IS_LAST_PAGE = 1; + public static final int IS_NOT_LAST_PAGE = 0; private static final int COMMIT_DATA_SIZE = 4; private static final MergePolicy NO_MERGE_POLICY = noMergePolicy(); @@ -142,36 +145,35 @@ public class PersistedClusterStateService { Setting.Property.Dynamic ); + public static final Setting DOCUMENT_PAGE_SIZE = Setting.byteSizeSetting( + "cluster_state.document_page_size", + ByteSizeValue.ofMb(1), + ByteSizeValue.ONE, + ByteSizeValue.ofGb(1), + Setting.Property.NodeScope + ); + private final Path[] dataPaths; private final String nodeId; private final XContentParserConfiguration parserConfig; - private final BigArrays bigArrays; private final LongSupplier relativeTimeMillisSupplier; + private final ByteSizeValue documentPageSize; private volatile TimeValue slowWriteLoggingThreshold; public PersistedClusterStateService( NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, - BigArrays bigArrays, ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier ) { - this( - nodeEnvironment.nodeDataPaths(), - nodeEnvironment.nodeId(), - namedXContentRegistry, - bigArrays, - clusterSettings, - relativeTimeMillisSupplier - ); + this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, clusterSettings, relativeTimeMillisSupplier); } public PersistedClusterStateService( Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry, - BigArrays bigArrays, ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier ) { @@ -179,10 +181,10 @@ public PersistedClusterStateService( this.nodeId = nodeId; this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE) .withRegistry(namedXContentRegistry); - this.bigArrays = bigArrays; this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + this.documentPageSize = clusterSettings.get(DOCUMENT_PAGE_SIZE); } private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { @@ -215,7 +217,7 @@ public Writer createWriter() throws IOException { IOUtils.closeWhileHandlingException(closeables); } } - return new Writer(metadataIndexWriters, nodeId, bigArrays, relativeTimeMillisSupplier, () -> slowWriteLoggingThreshold); + return new Writer(metadataIndexWriters, nodeId, documentPageSize, relativeTimeMillisSupplier, () -> slowWriteLoggingThreshold); } private static IndexWriter createIndexWriter(Directory directory, boolean openExisting) throws IOException { @@ -466,9 +468,7 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw final SetOnce builderReference = new SetOnce<>(); consumeFromType(searcher, GLOBAL_TYPE_NAME, bytes -> { - final Metadata metadata = Metadata.Builder.fromXContent( - XContentType.SMILE.xContent().createParser(parserConfig, bytes.bytes, bytes.offset, bytes.length) - ); + final Metadata metadata = readXContent(bytes, Metadata.Builder::fromXContent); logger.trace("found global metadata with last-accepted term [{}]", metadata.coordinationMetadata().term()); if (builderReference.get() != null) { throw new CorruptStateException("duplicate global metadata found in [" + dataPath + "]"); @@ -485,9 +485,7 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw final Set indexUUIDs = new HashSet<>(); consumeFromType(searcher, INDEX_TYPE_NAME, bytes -> { - final IndexMetadata indexMetadata = IndexMetadata.fromXContent( - XContentType.SMILE.xContent().createParser(parserConfig, bytes.bytes, bytes.offset, bytes.length) - ); + final IndexMetadata indexMetadata = readXContent(bytes, IndexMetadata::fromXContent); logger.trace("found index metadata for {}", indexMetadata.getIndex()); if (indexUUIDs.add(indexMetadata.getIndexUUID()) == false) { throw new CorruptStateException("duplicate metadata found for " + indexMetadata.getIndex() + " in [" + dataPath + "]"); @@ -511,13 +509,27 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw ); } - private static void consumeFromType(IndexSearcher indexSearcher, String type, CheckedConsumer bytesRefConsumer) - throws IOException { + private T readXContent(BytesReference bytes, CheckedFunction reader) throws IOException { + final XContentParser parser = XContentFactory.xContent(XContentType.SMILE).createParser(parserConfig, bytes.streamInput()); + try { + return reader.apply(parser); + } catch (Exception e) { + throw new CorruptStateException(e); + } + } + + private static void consumeFromType( + IndexSearcher indexSearcher, + String type, + CheckedConsumer bytesReferenceConsumer + ) throws IOException { final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, type)); final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); logger.trace("running query [{}]", query); + final Map documentReaders = new HashMap<>(); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { logger.trace("new leafReaderContext: {}", leafReaderContext); final Scorer scorer = weight.scorer(leafReaderContext); @@ -528,13 +540,55 @@ private static void consumeFromType(IndexSearcher indexSearcher, String type, Ch while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { if (isLiveDoc.test(docIdSetIterator.docID())) { logger.trace("processing doc {}", docIdSetIterator.docID()); - bytesRefConsumer.accept( - leafReaderContext.reader().document(docIdSetIterator.docID()).getBinaryValue(DATA_FIELD_NAME) - ); + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + final BytesArray documentData = new BytesArray(document.getBinaryValue(DATA_FIELD_NAME)); + + if (document.getField(PAGE_FIELD_NAME) == null) { + // legacy format: not paginated or compressed + assert Version.CURRENT.minimumIndexCompatibilityVersion().before(Version.V_7_16_0); + bytesReferenceConsumer.accept(documentData); + continue; + } + + final int pageIndex = document.getField(PAGE_FIELD_NAME).numericValue().intValue(); + final boolean isLastPage = document.getField(LAST_PAGE_FIELD_NAME).numericValue().intValue() == IS_LAST_PAGE; + + if (pageIndex == 0 && isLastPage) { + // common case: metadata fits in a single page + bytesReferenceConsumer.accept(CompressorFactory.COMPRESSOR.uncompress(documentData)); + continue; + } + + // The metadata doesn't fit into a single page, so we accumulate pages until we have a complete set. Typically we + // will see pages in order since they were written in order, so the map will often have at most one entry. Also 1MB + // should be ample space for compressed index metadata so this is almost always used just for the global metadata. + // Even in pathological cases we shouldn't run out of memory here because we're doing this very early on in node + // startup, on the main thread and before most other services have started, and we will need space to serialize the + // whole cluster state in memory later on. + + final String key; + if (type.equals(GLOBAL_TYPE_NAME)) { + key = GLOBAL_TYPE_NAME; + } else { + key = document.getField(INDEX_UUID_FIELD_NAME).stringValue(); + } + + final PaginatedDocumentReader reader = documentReaders.computeIfAbsent(key, k -> new PaginatedDocumentReader()); + final BytesReference bytesReference = reader.addPage(key, documentData, pageIndex, isLastPage); + if (bytesReference != null) { + documentReaders.remove(key); + bytesReferenceConsumer.accept(CompressorFactory.COMPRESSOR.uncompress(bytesReference)); + } } } } } + + if (documentReaders.isEmpty() == false) { + throw new CorruptStateException( + "incomplete paginated documents " + documentReaders.keySet() + " when reading cluster state index [type=" + type + "]" + ); + } } private static final ToXContent.Params FORMAT_PARAMS; @@ -586,17 +640,12 @@ private static class MetadataIndexWriter implements Closeable { void deleteAll() throws IOException { this.logger.trace("clearing existing metadata"); - this.indexWriter.deleteAll(); - } - - void updateIndexMetadataDocument(Document indexMetadataDocument, Index index) throws IOException { - this.logger.trace("updating metadata for [{}]", index); - indexWriter.updateDocument(new Term(INDEX_UUID_FIELD_NAME, index.getUUID()), indexMetadataDocument); + indexWriter.deleteAll(); } - void updateGlobalMetadata(Document globalMetadataDocument) throws IOException { - this.logger.trace("updating global metadata doc"); - indexWriter.updateDocument(new Term(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME), globalMetadataDocument); + public void deleteGlobalMetadata() throws IOException { + this.logger.trace("deleting global metadata docs"); + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME)); } void deleteIndexMetadata(String indexUUID) throws IOException { @@ -641,29 +690,25 @@ public static class Writer implements Closeable { private final List metadataIndexWriters; private final String nodeId; - private final BigArrays bigArrays; private final LongSupplier relativeTimeMillisSupplier; private final Supplier slowWriteLoggingThresholdSupplier; boolean fullStateWritten = false; private final AtomicBoolean closed = new AtomicBoolean(); - - // The size of the document buffer that was used for the last write operation, used as a hint for allocating the buffer for the - // next one. - private int documentBufferUsed; + private final byte[] documentBuffer; private Writer( List metadataIndexWriters, String nodeId, - BigArrays bigArrays, + ByteSizeValue documentPageSize, LongSupplier relativeTimeMillisSupplier, Supplier slowWriteLoggingThresholdSupplier ) { this.metadataIndexWriters = metadataIndexWriters; this.nodeId = nodeId; - this.bigArrays = bigArrays; this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; this.slowWriteLoggingThresholdSupplier = slowWriteLoggingThresholdSupplier; + this.documentBuffer = new byte[ByteSizeUnit.BYTES.toIntBytes(documentPageSize.getBytes())]; } private void ensureOpen() { @@ -786,63 +831,105 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata assert previouslyWrittenMetadata.coordinationMetadata().term() == metadata.coordinationMetadata().term(); logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", metadata.coordinationMetadata().term()); - try (DocumentBuffer documentBuffer = allocateBuffer()) { - - final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false; - if (updateGlobalMeta) { - final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer); - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument); - } + final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false; + if (updateGlobalMeta) { + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteGlobalMetadata(); } - final Map indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size()); - for (IndexMetadata indexMetadata : previouslyWrittenMetadata.indices().values()) { - final Long previousValue = indexMetadataVersionByUUID.putIfAbsent( - indexMetadata.getIndexUUID(), + addGlobalMetadataDocuments(metadata); + } + + final Map indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size()); + for (IndexMetadata indexMetadata : previouslyWrittenMetadata.indices().values()) { + final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); + assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue; + } + + int numIndicesUpdated = 0; + int numIndicesUnchanged = 0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID()); + if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { + logger.trace( + "updating metadata for [{}], changing version from [{}] to [{}]", + indexMetadata.getIndex(), + previousVersion, indexMetadata.getVersion() ); - assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue; - } + numIndicesUpdated++; - int numIndicesUpdated = 0; - int numIndicesUnchanged = 0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID()); - if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { - logger.trace( - "updating metadata for [{}], changing version from [{}] to [{}]", - indexMetadata.getIndex(), - previousVersion, - indexMetadata.getVersion() - ); - numIndicesUpdated++; - final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer); - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex()); - } - } else { - numIndicesUnchanged++; - logger.trace("no action required for [{}]", indexMetadata.getIndex()); + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteIndexMetadata(indexMetadata.getIndexUUID()); } - indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); + + addIndexMetadataDocuments(indexMetadata); + } else { + numIndicesUnchanged++; + logger.trace("no action required for [{}]", indexMetadata.getIndex()); } + indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); + } - documentBufferUsed = documentBuffer.getMaxUsed(); + for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) { + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteIndexMetadata(removedIndexUUID); + } + } - for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) { - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.deleteIndexMetadata(removedIndexUUID); - } + // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more + // gracefully than one that occurs during the commit process. + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.flush(); + } + + return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged); + } + + private static int lastPageValue(boolean isLastPage) { + return isLastPage ? IS_LAST_PAGE : IS_NOT_LAST_PAGE; + } + + private void addIndexMetadataDocuments(IndexMetadata indexMetadata) throws IOException { + final String indexUUID = indexMetadata.getIndexUUID(); + assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; + logger.trace("updating metadata for [{}]", indexMetadata.getIndex()); + writePages(indexMetadata, ((bytesRef, pageIndex, isLastPage) -> { + final Document document = new Document(); + document.add(new StringField(TYPE_FIELD_NAME, INDEX_TYPE_NAME, Field.Store.NO)); + document.add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.YES)); + document.add(new StoredField(PAGE_FIELD_NAME, pageIndex)); + document.add(new StoredField(LAST_PAGE_FIELD_NAME, lastPageValue(isLastPage))); + document.add(new StoredField(DATA_FIELD_NAME, bytesRef)); + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.indexWriter.addDocument(document); } + })); + } - // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more - // gracefully than one that occurs during the commit process. + private void addGlobalMetadataDocuments(Metadata metadata) throws IOException { + logger.trace("updating global metadata doc"); + writePages(metadata, (bytesRef, pageIndex, isLastPage) -> { + final Document document = new Document(); + document.add(new StringField(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME, Field.Store.NO)); + document.add(new StoredField(PAGE_FIELD_NAME, pageIndex)); + document.add(new StoredField(LAST_PAGE_FIELD_NAME, lastPageValue(isLastPage))); + document.add(new StoredField(DATA_FIELD_NAME, bytesRef)); for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.flush(); + metadataIndexWriter.indexWriter.addDocument(document); } + }); + } - return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged); + private void writePages(ToXContent metadata, PageWriter pageWriter) throws IOException { + try ( + PageWriterOutputStream paginatedStream = new PageWriterOutputStream(documentBuffer, pageWriter); + OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(paginatedStream); + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, compressedStream) + ) { + xContentBuilder.startObject(); + metadata.toXContent(xContentBuilder, FORMAT_PARAMS); + xContentBuilder.endObject(); } } @@ -860,38 +947,19 @@ private WriterStats overwriteMetadata(Metadata metadata) throws IOException { * Add documents for the metadata of the given cluster state, assuming that there are currently no documents. */ private WriterStats addMetadata(Metadata metadata) throws IOException { - try (DocumentBuffer documentBuffer = allocateBuffer()) { - - final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer); - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument); - } - - for (IndexMetadata indexMetadata : metadata.indices().values()) { - final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer); - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex()); - } - } + addGlobalMetadataDocuments(metadata); - documentBufferUsed = documentBuffer.getMaxUsed(); - - // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more - // gracefully than one that occurs during the commit process. - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.flush(); - } + for (IndexMetadata indexMetadata : metadata.indices().values()) { + addIndexMetadataDocuments(indexMetadata); + } - return new WriterStats(true, metadata.indices().size(), 0); + // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more + // gracefully than one that occurs during the commit process. + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.flush(); } - } - private DocumentBuffer allocateBuffer() { - // heuristics for picking the initial buffer size based on the buffer we needed last time: try and fit within a single page, - // but if we needed more than a single page last time then allow a bit more space to try and avoid needing to grow the buffer - // later on. - final int extraSpace = documentBufferUsed <= PageCacheRecycler.PAGE_SIZE_IN_BYTES ? 0 : PageCacheRecycler.PAGE_SIZE_IN_BYTES; - return new DocumentBuffer(documentBufferUsed + extraSpace, bigArrays); + return new WriterStats(true, metadata.indices().size(), 0); } public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) throws IOException { @@ -902,10 +970,17 @@ public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAccep void commit(long currentTerm, long lastAcceptedVersion) throws IOException { ensureOpen(); + prepareCommit(currentTerm, lastAcceptedVersion); + completeCommit(); + } + + private void prepareCommit(long currentTerm, long lastAcceptedVersion) throws IOException { + boolean prepareCommitSuccess = false; try { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { metadataIndexWriter.prepareCommit(nodeId, currentTerm, lastAcceptedVersion); } + prepareCommitSuccess = true; } catch (Exception e) { try { close(); @@ -916,11 +991,19 @@ void commit(long currentTerm, long lastAcceptedVersion) throws IOException { throw e; } finally { closeIfAnyIndexWriterHasTragedyOrIsClosed(); + if (prepareCommitSuccess == false) { + closeAndSuppressExceptions(); // let the error propagate even if closing fails here + } } + } + + private void completeCommit() { + boolean commitSuccess = false; try { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { metadataIndexWriter.commit(); } + commitSuccess = true; } catch (IOException e) { // The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the // data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and @@ -933,6 +1016,16 @@ void commit(long currentTerm, long lastAcceptedVersion) throws IOException { throw new IOError(e); } finally { closeIfAnyIndexWriterHasTragedyOrIsClosed(); + if (commitSuccess == false) { + closeAndSuppressExceptions(); // let the error propagate even if closing fails here + } + } + } + + private void closeAndSuppressExceptions() { + if (closed.compareAndSet(false, true)) { + logger.trace("closing PersistedClusterStateService.Writer suppressing any exceptions"); + IOUtils.closeWhileHandlingException(metadataIndexWriters); } } @@ -955,101 +1048,144 @@ static class WriterStats { this.numIndicesUnchanged = numIndicesUnchanged; } } + } - private Document makeIndexMetadataDocument(IndexMetadata indexMetadata, DocumentBuffer documentBuffer) throws IOException { - final Document indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata, documentBuffer); - final String indexUUID = indexMetadata.getIndexUUID(); - assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; - indexMetadataDocument.add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO)); - return indexMetadataDocument; - } + private interface PageWriter { + void consumePage(BytesRef bytesRef, int pageIndex, boolean isLastPage) throws IOException; + } - private Document makeGlobalMetadataDocument(Metadata metadata, DocumentBuffer documentBuffer) throws IOException { - return makeDocument(GLOBAL_TYPE_NAME, metadata, documentBuffer); - } + private static class PageWriterOutputStream extends OutputStream { - private Document makeDocument(String typeName, ToXContent metadata, DocumentBuffer documentBuffer) throws IOException { - final Document document = new Document(); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - - try (RecyclingBytesStreamOutput streamOutput = documentBuffer.streamOutput()) { - try ( - XContentBuilder xContentBuilder = XContentFactory.contentBuilder( - XContentType.SMILE, - Streams.flushOnCloseStream(streamOutput) - ) - ) { - xContentBuilder.startObject(); - metadata.toXContent(xContentBuilder, FORMAT_PARAMS); - xContentBuilder.endObject(); - } - document.add(new StoredField(DATA_FIELD_NAME, streamOutput.toBytesRef())); - } + private final byte[] buffer; + private final PageWriter pageWriter; + private int bufferPosition; + private int pageIndex; + private int bytesFlushed; + private boolean closed; - return document; + PageWriterOutputStream(byte[] buffer, PageWriter pageWriter) { + assert buffer.length > 0; + this.buffer = buffer; + this.pageWriter = pageWriter; } - } - /** - * Holds the current buffer, keeping track of new allocations as it grows. - */ - private static class DocumentBuffer implements Releasable { - private final BigArrays bigArrays; - - @Nullable // if the initial page doesn't need releasing - private final Releasable releasable; - private byte[] buffer; - private int maxUsed; - - DocumentBuffer(int size, BigArrays bigArrays) { - if (size <= PageCacheRecycler.PAGE_SIZE_IN_BYTES) { - final ByteArray byteArray = bigArrays.newByteArray(PageCacheRecycler.PAGE_SIZE_IN_BYTES); - final BytesRefIterator iterator = BytesReference.fromByteArray(byteArray, Math.toIntExact(byteArray.size())).iterator(); - final BytesRef firstPage; - try { - firstPage = iterator.next(); - assert iterator.next() == null : "should be one page"; - } catch (IOException e) { - throw new AssertionError("impossible", e); + @Override + public void write(@SuppressWarnings("NullableProblems") byte[] b, int off, int len) throws IOException { + assert closed == false : "cannot write after close"; + while (len > 0) { + if (bufferPosition == buffer.length) { + flushPage(false); } + assert bufferPosition < buffer.length; - // we require that we have the whole page to ourselves - assert firstPage.offset == 0 : firstPage.offset; - assert firstPage.bytes.length == PageCacheRecycler.PAGE_SIZE_IN_BYTES : firstPage.bytes.length; - buffer = firstPage.bytes; - releasable = byteArray; - } else { - buffer = new byte[size]; - releasable = null; + final int lenToBuffer = Math.min(len, buffer.length - bufferPosition); + System.arraycopy(b, off, buffer, bufferPosition, lenToBuffer); + bufferPosition += lenToBuffer; + off += lenToBuffer; + len -= lenToBuffer; } - this.bigArrays = bigArrays; - maxUsed = 0; } - RecyclingBytesStreamOutput streamOutput() { - return new RecyclingBytesStreamOutput(buffer, bigArrays) { - @Override - public BytesRef toBytesRef() { - final BytesRef bytesRef = super.toBytesRef(); - maxUsed = Math.max(maxUsed, bytesRef.length); - if (buffer != bytesRef.bytes) { - assert bytesRef.length > buffer.length; - logger.trace("growing document buffer from [{}] to [{}]", buffer.length, maxUsed); - buffer = bytesRef.bytes; - } - assert maxUsed <= buffer.length; - return bytesRef; - } - }; + @Override + public void write(int b) throws IOException { + assert closed == false : "cannot write after close"; + if (bufferPosition == buffer.length) { + flushPage(false); + } + assert bufferPosition < buffer.length; + buffer[bufferPosition++] = (byte) b; } - int getMaxUsed() { - return maxUsed; + @Override + public void flush() throws IOException { + assert closed == false : "must not flush after close"; + // keep buffering, don't actually flush anything } @Override - public void close() { - Releasables.close(releasable); + public void close() throws IOException { + if (closed == false) { + closed = true; + flushPage(true); + } + } + + private void flushPage(boolean isLastPage) throws IOException { + assert bufferPosition > 0 : "cannot flush empty page"; + assert bufferPosition == buffer.length || isLastPage : "only the last page may be incomplete"; + if (bytesFlushed > Integer.MAX_VALUE - bufferPosition) { + // At startup the state doc is loaded into a single BytesReference which means it must be no longer than Integer.MAX_VALUE, + // so we would not be able to read it if we carried on. Better to fail early during writing instead. + throw new IllegalArgumentException("cannot persist cluster state document larger than 2GB"); + } + bytesFlushed += bufferPosition; + pageWriter.consumePage(new BytesRef(buffer, 0, bufferPosition), pageIndex, isLastPage); + pageIndex += 1; + bufferPosition = 0; + } + } + + private static class PaginatedDocumentReader { + + private final ArrayList pages = new ArrayList<>(); + private int emptyPages; + private int pageCount = -1; + + /** + * @return a {@link BytesReference} if all pages received, otherwise {@code null}. + */ + @Nullable + BytesReference addPage(String key, BytesReference bytesReference, int pageIndex, boolean isLastPage) throws CorruptStateException { + while (pages.size() < pageIndex) { + if (pageCount != -1) { + throw new CorruptStateException( + "found page [" + + pageIndex + + "] but last page was [" + + pageCount + + "] when reading key [" + + key + + "] from cluster state index" + ); + } + emptyPages += 1; + pages.add(null); + } + if (pages.size() == pageIndex) { + pages.add(bytesReference); + } else { + if (pages.get(pageIndex) != null) { + throw new CorruptStateException( + "found duplicate page [" + pageIndex + "] when reading key [" + key + "] from cluster state index" + ); + } + emptyPages -= 1; + pages.set(pageIndex, bytesReference); + } + if (isLastPage) { + if (pageCount != -1) { + throw new CorruptStateException( + "already read page count " + + pageCount + + " but page " + + pageIndex + + " is also marked as the last page when reading key [" + + key + + "] from cluster state index" + ); + } + pageCount = pageIndex + 1; + if (pages.size() != pageCount) { + throw new CorruptStateException( + "already read " + pages.size() + " pages but page " + pageIndex + " is marked as the last page" + ); + } + } + if (pageCount != -1 && emptyPages == 0) { + return CompositeBytesReference.of(pages.toArray(new BytesReference[0])); + } else { + return null; + } } } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f62c2f1d0a600..02812c4a7f2e7 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -554,10 +554,9 @@ protected Node( BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); modules.add(settingsModule); final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); - final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService( + final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry, - bigArrays, clusterService.getClusterSettings(), threadPool::relativeTimeInMillis ); @@ -916,7 +915,7 @@ protected Node( b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); - b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory); + b.bind(PersistedClusterStateService.class).toInstance(persistedClusterStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); diff --git a/server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java b/server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java deleted file mode 100644 index 219112176e563..0000000000000 --- a/server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.common.bytes; - -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.MockPageCacheRecycler; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.sameInstance; - -public class RecyclingBytesStreamOutputTests extends ESTestCase { - - public void testReturnsWrittenBytesAndRecyclesBufferIfPossible() throws IOException { - - final byte[] source = randomUnicodeOfLength(scaledRandomIntBetween(0, 20000)).getBytes(StandardCharsets.UTF_8); - final byte[] buffer = new byte[scaledRandomIntBetween(0, 20000)]; - - final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - try (RecyclingBytesStreamOutput output = new RecyclingBytesStreamOutput(buffer, bigArrays)) { - int position = 0; - while (position < source.length) { - if (randomBoolean()) { - output.writeByte(source[position++]); - } else { - final int length = randomIntBetween(1, source.length - position); - final int sliceStart = randomIntBetween(0, position); - final int sliceEnd = randomIntBetween(position + length, source.length); - final byte[] slice = new byte[sliceEnd - sliceStart]; - System.arraycopy(source, sliceStart, slice, 0, slice.length); - output.writeBytes(slice, position - sliceStart, length); - position += length; - } - } - - final BytesRef bytesRef; - - if (randomBoolean()) { - bytesRef = output.toBytesRef(); - assertThat(bytesRef.offset, equalTo(0)); - - if (source.length <= buffer.length) { - assertThat("should have re-used the same buffer", bytesRef.bytes, sameInstance(buffer)); - } else { - assertThat("new buffer should be the right size", bytesRef.bytes.length, equalTo(source.length)); - } - } else { - bytesRef = output.bytes().toBytesRef(); - } - - assertThat(bytesRef.length, equalTo(source.length)); - final byte[] trimmed = new byte[source.length]; - System.arraycopy(bytesRef.bytes, bytesRef.offset, trimmed, 0, bytesRef.length); - assertArrayEquals(source, trimmed); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java index 367ed08bffba5..353b8be9ffc45 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.gateway.PersistedClusterStateService; @@ -70,7 +69,6 @@ public void createNodePaths() throws IOException { nodePaths, nodeId, xContentRegistry(), - BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(dataMasterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ).createWriter() diff --git a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java index 13d4b943db91c..d465efec3f964 100644 --- a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.test.ESTestCase; import org.junit.After; @@ -51,7 +50,6 @@ public void createNodePaths() throws IOException { nodePaths, nodeId, xContentRegistry(), - BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ).createWriter() @@ -78,7 +76,6 @@ public void checkClusterStateIntact() throws IOException { nodePaths, nodeId, xContentRegistry(), - BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ).loadBestOnDiskState().metadata.persistentSettings() diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 331fd0ae00ac9..7059e3f47586d 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.gateway; +import org.apache.lucene.mockfile.FilterFileSystemProvider; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.ExceptionsHelper; @@ -26,15 +27,14 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.PathUtils; +import org.elasticsearch.core.PathUtilsForTesting; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -44,6 +44,8 @@ import java.io.Closeable; import java.io.IOError; import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.OpenOption; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; @@ -56,7 +58,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -65,11 +66,9 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { private ClusterName clusterName; private Settings settings; private DiscoveryNode localNode; - private BigArrays bigArrays; @Override public void setUp() throws Exception { - bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); nodeEnvironment = newNodeEnvironment(); localNode = new DiscoveryNode( "node1", @@ -79,7 +78,14 @@ public void setUp() throws Exception { Version.CURRENT ); clusterName = new ClusterName(randomAlphaOfLength(10)); - settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).build(); + final Settings.Builder settingsBuilder = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()); + if (randomBoolean()) { + settingsBuilder.put( + PersistedClusterStateService.DOCUMENT_PAGE_SIZE.getKey(), + ByteSizeValue.ofBytes(randomLongBetween(1, 1024)) + ); + } + settings = settingsBuilder.build(); super.setUp(); } @@ -90,7 +96,7 @@ public void tearDown() throws Exception { } private CoordinationState.PersistedState newGatewayPersistedState() { - final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); + final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); gateway.start(settings, nodeEnvironment, xContentRegistry()); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class)); @@ -322,7 +328,6 @@ public void testStatePersistedOnLoad() throws IOException { final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); @@ -354,7 +359,6 @@ public void testStatePersistedOnLoad() throws IOException { final PersistedClusterStateService newPersistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); @@ -388,7 +392,7 @@ public void testDataOnlyNodePersistence() throws Exception { .put(nonMasterNode()) .put(Node.NODE_NAME_SETTING.getKey(), "test") .build(); - final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); + final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); cleanup.add(gateway); final TransportService transportService = mock(TransportService.class); TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode"); @@ -401,7 +405,6 @@ public void testDataOnlyNodePersistence() throws Exception { final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); @@ -521,7 +524,6 @@ public void testStatePersistenceWithIOIssues() throws IOException { final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ) { @@ -609,7 +611,6 @@ Directory createDirectory(Path path) { final PersistedClusterStateService newPersistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); @@ -628,68 +629,79 @@ Directory createDirectory(Path path) { } public void testStatePersistenceWithFatalError() throws IOException { - final AtomicBoolean throwError = new AtomicBoolean(); - final BigArrays realBigArrays = getBigArrays(); - final BigArrays mockBigArrays = mock(BigArrays.class); - when(mockBigArrays.newByteArray(anyLong())).thenAnswer(invocationOnMock -> { - if (throwError.get() && randomBoolean()) { - throw new TestError(); - } - return realBigArrays.newByteArray((Long) invocationOnMock.getArguments()[0]); - }); - - final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( - nodeEnvironment, - xContentRegistry(), - mockBigArrays, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L - ); - ClusterState state = createClusterState(randomNonNegativeLong(), Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build()); long currentTerm = 42L; - try ( - GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState( - persistedClusterStateService, - currentTerm, - state - ) - ) { - - throwError.set(true); + ClusterState state = createClusterState(randomNonNegativeLong(), Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build()); - for (int i = between(1, 5); 0 <= i; i--) { - if (randomBoolean()) { - final ClusterState newState = createClusterState( - randomNonNegativeLong(), - Metadata.builder() - .clusterUUID(randomAlphaOfLength(10)) - .coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build()) - .build() - ); - try { - persistedState.setLastAcceptedState(newState); - state = newState; - } catch (TestError e) { - // ok - } - } else { - final long newTerm = currentTerm + 1; - try { - persistedState.setCurrentTerm(newTerm); - currentTerm = newTerm; - } catch (TestError e) { - // ok + final AtomicBoolean throwError = new AtomicBoolean(); + final AtomicBoolean openedFile = new AtomicBoolean(); + PathUtilsForTesting.installMock(new FilterFileSystemProvider("throwerror://", PathUtils.getDefaultFileSystem()) { + @Override + public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException { + if (throwError.get() && path.getFileName().toString().startsWith("pending_segments_")) { + openedFile.set(true); + if (randomBoolean()) { + throw randomBoolean() ? new TestTragicError() : new TestComedicError(); } } + return super.newOutputStream(path, options); } + }.getFileSystem(null)); + + final Path[] dataPaths; + try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) { // a new node environment is needed to pick up the fake filesystem + dataPaths = nodeEnvironment.nodeDataPaths(); + try ( + GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState( + new PersistedClusterStateService( + nodeEnvironment, + xContentRegistry(), + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ), + currentTerm, + state + ) + ) { + throwError.set(true); + for (int i = between(1, 5); 0 <= i; i--) { + if (randomBoolean()) { + final ClusterState newState = createClusterState( + randomNonNegativeLong(), + Metadata.builder() + .clusterUUID(randomAlphaOfLength(10)) + .coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build()) + .build() + ); + try { + persistedState.setLastAcceptedState(newState); + state = newState; + } catch (TestTragicError | TestComedicError e) { + logger.info("--> test error", e); + // ok + } + } else { + final long newTerm = currentTerm + 1; + try { + persistedState.setCurrentTerm(newTerm); + currentTerm = newTerm; + } catch (TestTragicError | TestComedicError e) { + logger.info("--> test error", e); + // ok + } + } + } - assertEquals(state, persistedState.getLastAcceptedState()); - assertEquals(currentTerm, persistedState.getCurrentTerm()); + assertEquals(state, persistedState.getLastAcceptedState()); + assertEquals(currentTerm, persistedState.getCurrentTerm()); + } + } finally { + PathUtilsForTesting.teardown(); } + assertTrue(openedFile.get()); // make sure we had opportunity to throw an error nodeEnvironment.close(); - for (Path path : nodeEnvironment.nodeDataPaths()) { + for (Path path : dataPaths) { Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) .put(Environment.PATH_DATA_SETTING.getKey(), path.toString()) @@ -698,12 +710,11 @@ public void testStatePersistenceWithFatalError() throws IOException { final PersistedClusterStateService newPersistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); - assertFalse(onDiskState.empty()); + assertFalse("loaded state from " + path, onDiskState.empty()); assertThat(onDiskState.currentTerm, equalTo(currentTerm)); assertClusterStateEqual( state, @@ -716,16 +727,21 @@ public void testStatePersistenceWithFatalError() throws IOException { } } - private static BigArrays getBigArrays() { - return usually() - ? BigArrays.NON_RECYCLING_INSTANCE - : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + /** + * An {@link Error} which is tragic. + */ + private static final class TestTragicError extends VirtualMachineError { + TestTragicError() { + super("test tragic error"); + } } - private static final class TestError extends Error { - TestError() { - super("test error"); + /** + * An {@link Error} which is not tragic. + */ + private static final class TestComedicError extends Error { + TestComedicError() { + super("test comedic error"); } } - } diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 64d429fe5a2ef..634962bc14684 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -10,16 +10,31 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.Term; import org.apache.lucene.mockfile.ExtrasFS; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.util.Bits; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -28,20 +43,19 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -58,14 +72,23 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.GLOBAL_TYPE_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.INDEX_TYPE_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.IS_LAST_PAGE; +import static org.elasticsearch.gateway.PersistedClusterStateService.IS_NOT_LAST_PAGE; +import static org.elasticsearch.gateway.PersistedClusterStateService.LAST_PAGE_FIELD_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.METADATA_DIRECTORY_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.PAGE_FIELD_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.TYPE_FIELD_NAME; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; @@ -77,11 +100,16 @@ public class PersistedClusterStateServiceTests extends ESTestCase { private PersistedClusterStateService newPersistedClusterStateService(NodeEnvironment nodeEnvironment) { + + final Settings.Builder settings = Settings.builder(); + if (randomBoolean()) { + settings.put(PersistedClusterStateService.DOCUMENT_PAGE_SIZE.getKey(), ByteSizeValue.ofBytes(randomLongBetween(1, 1024))); + } + return new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new ClusterSettings(settings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); } @@ -268,7 +296,6 @@ public void testFailsOnMismatchedNodeIds() throws IOException { combinedPaths, nodeIds[0], xContentRegistry(), - BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ).loadBestOnDiskState() @@ -441,7 +468,6 @@ public void testFailsGracefullyOnExceptionDuringFlush() throws IOException { final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ) { @@ -473,7 +499,8 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti .build(); throwException.set(true); assertThat( - expectThrows(IOException.class, () -> writeState(writer, newTerm, newState, clusterState)).getMessage(), + expectThrows(IllegalStateException.class, IOException.class, () -> writeState(writer, newTerm, newState, clusterState)) + .getMessage(), containsString("simulated") ); } @@ -487,7 +514,6 @@ public void testClosesWriterOnFatalError() throws IOException { final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ) { @@ -537,7 +563,6 @@ public void testCrashesWithIOErrorOnCommitFailure() throws IOException { final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ) { @@ -1006,6 +1031,124 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { } } + public void testHandlesShuffledDocuments() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + final Metadata.Builder metadata = Metadata.builder(); + for (int i = between(5, 20); i >= 0; i--) { + metadata.put( + IndexMetadata.builder("test-" + i) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } + + final Settings.Builder persistentSettings = Settings.builder(); + persistentSettings.put( + PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), + TimeValue.timeValueMillis(randomLongBetween(0, 10000)) + ); + metadata.persistentSettings(persistentSettings.build()); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build(); + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, clusterState); + } + + final List documents = new ArrayList<>(); + final Map commitUserData; + + try ( + Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); + DirectoryReader reader = DirectoryReader.open(directory) + ) { + commitUserData = reader.getIndexCommit().getUserData(); + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + for (String typeName : new String[] { GLOBAL_TYPE_NAME, INDEX_TYPE_NAME }) { + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + final Scorer scorer = weight.scorer(leafReaderContext); + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + documents.add(document); + } + } + } + } + } + + Randomness.shuffle(documents); + + try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + for (Document document : documents) { + indexWriter.addDocument(document); + } + indexWriter.setLiveCommitData(commitUserData.entrySet()); + indexWriter.commit(); + } + } + + final ClusterState loadedState = loadPersistedClusterState(persistedClusterStateService); + assertEquals(clusterState.metadata().indices(), loadedState.metadata().indices()); + assertEquals(clusterState.metadata().persistentSettings(), loadedState.metadata().persistentSettings()); + + // Now corrupt one of the docs, breaking pagination invariants, and ensure it yields a CorruptStateException + + final int corruptIndex = between(0, documents.size() - 1); + final Document corruptDocument = documents.get(corruptIndex); + final int corruptDocPage = corruptDocument.getField(PAGE_FIELD_NAME).numericValue().intValue(); + final boolean corruptDocIsLastPage = corruptDocument.getField(LAST_PAGE_FIELD_NAME).numericValue().intValue() == IS_LAST_PAGE; + final boolean isOnlyPageForIndex = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(INDEX_TYPE_NAME) + && corruptDocPage == 0 + && corruptDocIsLastPage; + if (isOnlyPageForIndex == false // don't remove the only doc for an index, this just loses the index and doesn't corrupt + && rarely()) { + documents.remove(between(0, documents.size() - 1)); + } else { + if (randomBoolean()) { + corruptDocument.removeFields(PAGE_FIELD_NAME); + corruptDocument.add( + new StoredField(PAGE_FIELD_NAME, randomValueOtherThan(corruptDocPage, () -> between(0, corruptDocPage + 10))) + ); + } else { + corruptDocument.removeFields(LAST_PAGE_FIELD_NAME); + corruptDocument.add(new StoredField(LAST_PAGE_FIELD_NAME, corruptDocIsLastPage ? IS_NOT_LAST_PAGE : IS_LAST_PAGE)); + } + } + + try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + for (Document document : documents) { + indexWriter.addDocument(document); + } + indexWriter.setLiveCommitData(commitUserData.entrySet()); + indexWriter.commit(); + } + } + + expectThrows(CorruptStateException.class, () -> loadPersistedClusterState(persistedClusterStateService)); + } + } + @TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level") public void testSlowLogging() throws IOException, IllegalAccessException { final long slowWriteLoggingThresholdMillis; @@ -1034,7 +1177,6 @@ public void testSlowLogging() throws IOException, IllegalAccessException { PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - getBigArrays(), clusterSettings, () -> currentTime.getAndAdd(writeDurationMillis.get()) ); @@ -1389,10 +1531,4 @@ private static ClusterState clusterStateFromMetadata(long version, Metadata meta return ClusterState.builder(ClusterName.DEFAULT).version(version).metadata(metadata).build(); } - private static BigArrays getBigArrays() { - return usually() - ? BigArrays.NON_RECYCLING_INSTANCE - : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - } - } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index ed202e47bcfb9..067323ebc4020 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; @@ -154,7 +153,6 @@ public void setup() throws IOException { paths, nodeId, xContentRegistry(), - BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ).createWriter() diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index db0c012980ddf..6b4bcfaefc9d0 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; @@ -868,7 +869,7 @@ class MockPersistedState implements CoordinationState.PersistedState { if (rarely()) { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); - final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); + final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { @@ -896,13 +897,21 @@ class MockPersistedState implements CoordinationState.PersistedState { nodeEnvironment = oldState.nodeEnvironment; final Metadata updatedMetadata = adaptGlobalMetadata.apply(oldState.getLastAcceptedState().metadata()); final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm()); + + final Settings.Builder writerSettings = Settings.builder(); + if (randomBoolean()) { + writerSettings.put( + PersistedClusterStateService.DOCUMENT_PAGE_SIZE.getKey(), + ByteSizeValue.ofBytes(randomLongBetween(1, 1024)) + ); + } + if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) { try ( PersistedClusterStateService.Writer writer = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), - bigArrays, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new ClusterSettings(writerSettings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), deterministicTaskQueue::getCurrentTimeMillis ).createWriter() ) { @@ -912,7 +921,7 @@ class MockPersistedState implements CoordinationState.PersistedState { ); } } - final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode, bigArrays); + final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index 815fe77888e59..f5923e1451a8e 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.MetadataUpgrader; @@ -37,11 +36,9 @@ */ public class MockGatewayMetaState extends GatewayMetaState { private final DiscoveryNode localNode; - private final BigArrays bigArrays; - public MockGatewayMetaState(DiscoveryNode localNode, BigArrays bigArrays) { + public MockGatewayMetaState(DiscoveryNode localNode) { this.localNode = localNode; - this.bigArrays = bigArrays; } @Override @@ -79,7 +76,6 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont new PersistedClusterStateService( nodeEnvironment, xContentRegistry, - bigArrays, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ca3e1847c065f..bd3fc29ef2356 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -70,6 +70,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLockObtainFailedException; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -580,6 +581,12 @@ private static Settings getRandomNodeSettings(long seed) { int retryTimeoutSeconds = RandomNumbers.randomIntBetween(random, 0, 60); builder.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), timeValueSeconds(retryTimeoutSeconds)); } + if (random.nextInt(10) == 0) { + builder.put( + PersistedClusterStateService.DOCUMENT_PAGE_SIZE.getKey(), + new ByteSizeValue(RandomNumbers.randomIntBetween(random, rarely() ? 10 : 100, randomFrom(1000, 10000, 100000, 1000000))) + ); + } return builder.build(); }