Skip to content

Commit

Permalink
[HUDI-2875] allow memory executor exit gracefully. And fix concurrent…
Browse files Browse the repository at this point in the history
… call of HoodieWriteHandle
  • Loading branch information
guanziyue committed Mar 12, 2022
1 parent e7bb041 commit c74c41d
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -65,6 +67,7 @@
* Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
* happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
*/
@NotThreadSafe
public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

@NotThreadSafe
public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -92,6 +94,7 @@
*
* </p>
*/
@NotThreadSafe
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import org.apache.avro.generic.GenericRecord;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
Expand All @@ -45,6 +47,7 @@
* The implementation performs a merge-sort by comparing the key of the record being written to the list of
* keys in newRecordKeys (sorted in-memory).
*/
@NotThreadSafe
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {

private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.concurrent.NotThreadSafe;

/**
* A HoodieCreateHandle which writes all data into a single file.
* <p>
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
@NotThreadSafe
public class HoodieUnboundedCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the <code>canWrite()</code>
*
* ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
@NotThreadSafe
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {

Expand Down Expand Up @@ -108,6 +113,11 @@ public void writeAvro(String key, IndexedRecord object) throws IOException {
}
}

@Override
public void close() throws IOException {
super.close();
}

@Override
public long getBytesWritten() {
return fs.getBytesWritten(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected List<WriteStatus> computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
bufferedIteratorExecutor.awaitTermination();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,16 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
}
mergeHandle.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected List<WriteStatus> computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
bufferedIteratorExecutor.awaitTermination();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,16 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
}
mergeHandle.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected List<WriteStatus> computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
bufferedIteratorExecutor.awaitTermination();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle, Path so
} catch (Exception e) {
throw new HoodieException(e);
} finally {
bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
}
bootstrapHandle.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ Schema getAvroSchema(Path sourceFilePath) throws IOException {
void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try {
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
Expand All @@ -84,10 +84,12 @@ void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
} catch (Exception e) {
throw new HoodieException(e);
} finally {
bootstrapHandle.close();
reader.close();
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
}
bootstrapHandle.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,16 @@ public void runMerge(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>
} catch (Exception e) {
throw new HoodieException(e);
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
}
mergeHandle.close();
}
}
}
Loading

0 comments on commit c74c41d

Please sign in to comment.