forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add batching processor base type AbstractBatchingProcessor (opensearc…
…h-project#14554) Signed-off-by: Liyun Xiu <[email protected]>
- Loading branch information
Showing
3 changed files
with
297 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.ingest; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Consumer; | ||
|
||
import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; | ||
|
||
/** | ||
* Abstract base class for batch processors. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public abstract class AbstractBatchingProcessor extends AbstractProcessor { | ||
|
||
public static final String BATCH_SIZE_FIELD = "batch_size"; | ||
private static final int DEFAULT_BATCH_SIZE = 1; | ||
protected final int batchSize; | ||
|
||
protected AbstractBatchingProcessor(String tag, String description, int batchSize) { | ||
super(tag, description); | ||
this.batchSize = batchSize; | ||
} | ||
|
||
/** | ||
* Internal logic to process batched documents, must be implemented by concrete batch processors. | ||
* | ||
* @param ingestDocumentWrappers {@link List} of {@link IngestDocumentWrapper} to be processed. | ||
* @param handler {@link Consumer} to be called with the results of the processing. | ||
*/ | ||
protected abstract void subBatchExecute( | ||
List<IngestDocumentWrapper> ingestDocumentWrappers, | ||
Consumer<List<IngestDocumentWrapper>> handler | ||
); | ||
|
||
@Override | ||
public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) { | ||
if (ingestDocumentWrappers.isEmpty()) { | ||
handler.accept(Collections.emptyList()); | ||
return; | ||
} | ||
|
||
// if batch size is larger than document size, send one batch | ||
if (this.batchSize >= ingestDocumentWrappers.size()) { | ||
subBatchExecute(ingestDocumentWrappers, handler); | ||
return; | ||
} | ||
|
||
// split documents into multiple batches and send each batch to batch processors | ||
List<List<IngestDocumentWrapper>> batches = cutBatches(ingestDocumentWrappers); | ||
int size = ingestDocumentWrappers.size(); | ||
AtomicInteger counter = new AtomicInteger(size); | ||
List<IngestDocumentWrapper> allResults = Collections.synchronizedList(new ArrayList<>()); | ||
for (List<IngestDocumentWrapper> batch : batches) { | ||
this.subBatchExecute(batch, batchResults -> { | ||
allResults.addAll(batchResults); | ||
if (counter.addAndGet(-batchResults.size()) == 0) { | ||
handler.accept(allResults); | ||
} | ||
assert counter.get() >= 0 : "counter is negative"; | ||
}); | ||
} | ||
} | ||
|
||
private List<List<IngestDocumentWrapper>> cutBatches(List<IngestDocumentWrapper> ingestDocumentWrappers) { | ||
List<List<IngestDocumentWrapper>> batches = new ArrayList<>(); | ||
for (int i = 0; i < ingestDocumentWrappers.size(); i += this.batchSize) { | ||
batches.add(ingestDocumentWrappers.subList(i, Math.min(i + this.batchSize, ingestDocumentWrappers.size()))); | ||
} | ||
return batches; | ||
} | ||
|
||
/** | ||
* Factory class for creating {@link AbstractBatchingProcessor} instances. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public abstract static class Factory implements Processor.Factory { | ||
final String processorType; | ||
|
||
protected Factory(String processorType) { | ||
this.processorType = processorType; | ||
} | ||
|
||
/** | ||
* Creates a new processor instance. | ||
* | ||
* @param processorFactories The processor factories. | ||
* @param tag The processor tag. | ||
* @param description The processor description. | ||
* @param config The processor configuration. | ||
* @return The new AbstractBatchProcessor instance. | ||
* @throws Exception If the processor could not be created. | ||
*/ | ||
@Override | ||
public AbstractBatchingProcessor create( | ||
Map<String, Processor.Factory> processorFactories, | ||
String tag, | ||
String description, | ||
Map<String, Object> config | ||
) throws Exception { | ||
int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE); | ||
if (batchSize < 1) { | ||
throw newConfigurationException(this.processorType, tag, BATCH_SIZE_FIELD, "batch size must be a positive integer"); | ||
} | ||
return newProcessor(tag, description, batchSize, config); | ||
} | ||
|
||
/** | ||
* Returns a new processor instance. | ||
* | ||
* @param tag tag of the processor | ||
* @param description description of the processor | ||
* @param batchSize batch size of the processor | ||
* @param config configuration of the processor | ||
* @return a new batch processor instance | ||
*/ | ||
protected abstract AbstractBatchingProcessor newProcessor( | ||
String tag, | ||
String description, | ||
int batchSize, | ||
Map<String, Object> config | ||
); | ||
} | ||
} |
160 changes: 160 additions & 0 deletions
160
server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.ingest; | ||
|
||
import org.opensearch.OpenSearchParseException; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.function.Consumer; | ||
|
||
public class AbstractBatchingProcessorTests extends OpenSearchTestCase { | ||
|
||
public void testBatchExecute_emptyInput() { | ||
DummyProcessor processor = new DummyProcessor(3); | ||
Consumer<List<IngestDocumentWrapper>> handler = (results) -> assertTrue(results.isEmpty()); | ||
processor.batchExecute(Collections.emptyList(), handler); | ||
assertTrue(processor.getSubBatches().isEmpty()); | ||
} | ||
|
||
public void testBatchExecute_singleBatchSize() { | ||
DummyProcessor processor = new DummyProcessor(3); | ||
List<IngestDocumentWrapper> wrapperList = Arrays.asList( | ||
IngestDocumentPreparer.createIngestDocumentWrapper(1), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(2), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(3) | ||
); | ||
List<IngestDocumentWrapper> resultList = new ArrayList<>(); | ||
processor.batchExecute(wrapperList, resultList::addAll); | ||
assertEquals(wrapperList, resultList); | ||
assertEquals(1, processor.getSubBatches().size()); | ||
assertEquals(wrapperList, processor.getSubBatches().get(0)); | ||
} | ||
|
||
public void testBatchExecute_multipleBatches() { | ||
DummyProcessor processor = new DummyProcessor(2); | ||
List<IngestDocumentWrapper> wrapperList = Arrays.asList( | ||
IngestDocumentPreparer.createIngestDocumentWrapper(1), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(2), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(3), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(4), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(5) | ||
); | ||
List<IngestDocumentWrapper> resultList = new ArrayList<>(); | ||
processor.batchExecute(wrapperList, resultList::addAll); | ||
assertEquals(wrapperList, resultList); | ||
assertEquals(3, processor.getSubBatches().size()); | ||
assertEquals(wrapperList.subList(0, 2), processor.getSubBatches().get(0)); | ||
assertEquals(wrapperList.subList(2, 4), processor.getSubBatches().get(1)); | ||
assertEquals(wrapperList.subList(4, 5), processor.getSubBatches().get(2)); | ||
} | ||
|
||
public void testBatchExecute_randomBatches() { | ||
int batchSize = randomIntBetween(2, 32); | ||
int docCount = randomIntBetween(2, 32); | ||
DummyProcessor processor = new DummyProcessor(batchSize); | ||
List<IngestDocumentWrapper> wrapperList = new ArrayList<>(); | ||
for (int i = 0; i < docCount; ++i) { | ||
wrapperList.add(IngestDocumentPreparer.createIngestDocumentWrapper(i)); | ||
} | ||
List<IngestDocumentWrapper> resultList = new ArrayList<>(); | ||
processor.batchExecute(wrapperList, resultList::addAll); | ||
assertEquals(wrapperList, resultList); | ||
assertEquals(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1), processor.getSubBatches().size()); | ||
} | ||
|
||
public void testBatchExecute_defaultBatchSize() { | ||
DummyProcessor processor = new DummyProcessor(1); | ||
List<IngestDocumentWrapper> wrapperList = Arrays.asList( | ||
IngestDocumentPreparer.createIngestDocumentWrapper(1), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(2), | ||
IngestDocumentPreparer.createIngestDocumentWrapper(3) | ||
); | ||
List<IngestDocumentWrapper> resultList = new ArrayList<>(); | ||
processor.batchExecute(wrapperList, resultList::addAll); | ||
assertEquals(wrapperList, resultList); | ||
assertEquals(3, processor.getSubBatches().size()); | ||
assertEquals(wrapperList.subList(0, 1), processor.getSubBatches().get(0)); | ||
assertEquals(wrapperList.subList(1, 2), processor.getSubBatches().get(1)); | ||
assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2)); | ||
} | ||
|
||
public void testFactory_invalidBatchSize() { | ||
Map<String, Object> config = new HashMap<>(); | ||
config.put("batch_size", 0); | ||
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); | ||
OpenSearchParseException exception = assertThrows(OpenSearchParseException.class, () -> factory.create(config)); | ||
assertEquals("[batch_size] batch size must be a positive integer", exception.getMessage()); | ||
} | ||
|
||
public void testFactory_defaultBatchSize() throws Exception { | ||
Map<String, Object> config = new HashMap<>(); | ||
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); | ||
DummyProcessor processor = (DummyProcessor) factory.create(config); | ||
assertEquals(1, processor.batchSize); | ||
} | ||
|
||
public void testFactory_callNewProcessor() throws Exception { | ||
Map<String, Object> config = new HashMap<>(); | ||
config.put("batch_size", 3); | ||
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); | ||
DummyProcessor processor = (DummyProcessor) factory.create(config); | ||
assertEquals(3, processor.batchSize); | ||
} | ||
|
||
static class DummyProcessor extends AbstractBatchingProcessor { | ||
private List<List<IngestDocumentWrapper>> subBatches = new ArrayList<>(); | ||
|
||
public List<List<IngestDocumentWrapper>> getSubBatches() { | ||
return subBatches; | ||
} | ||
|
||
protected DummyProcessor(int batchSize) { | ||
super("tag", "description", batchSize); | ||
} | ||
|
||
@Override | ||
public void subBatchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) { | ||
subBatches.add(ingestDocumentWrappers); | ||
handler.accept(ingestDocumentWrappers); | ||
} | ||
|
||
@Override | ||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { | ||
return ingestDocument; | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return null; | ||
} | ||
|
||
public static class DummyProcessorFactory extends Factory { | ||
|
||
protected DummyProcessorFactory(String processorType) { | ||
super(processorType); | ||
} | ||
|
||
public AbstractBatchingProcessor create(Map<String, Object> config) throws Exception { | ||
final Map<String, org.opensearch.ingest.Processor.Factory> processorFactories = new HashMap<>(); | ||
return super.create(processorFactories, "tag", "description", config); | ||
} | ||
|
||
@Override | ||
protected AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map<String, Object> config) { | ||
return new DummyProcessor(batchSize); | ||
} | ||
} | ||
} | ||
} |