Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Default Configuration for RocksDB MaxManifestFileSize #1706

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream
|stores.**_store-name_**.<br>rocksdb.keep.log.file.num|2|The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.|
|stores.**_store-name_**.<br>rocksdb.metrics.list|(none)|A list of [RocksDB properties](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409) to expose as metrics (gauges).|
|stores.**_store-name_**.<br>rocksdb.delete.obsolete.files.period.micros|21600000000|This property specifies the period in microseconds to delete obsolete files regardless of files removed during compaction. Allowed range is up to 9223372036854775807.|
|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|18446744073709551615|This property specifies the maximum size of the MANIFEST data file, after which it is rotated. Default value is also the maximum, making it practically unlimited: only one manifest file is used.|
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number was outdated, and it should be 1024^3 which is 1073741824

|stores-default.<br>rocksdb.max.manifest.file.size.bytes|1073741824| This property specifies the default maximum size (in bytes) of the MANIFEST data file for **ANY** stores, after which it is rotated. The default value is 1GB. The value for a specific store can be configured by `stores.store-name.rocksdb.max.manifest.file.size`.|
|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|stores-default.<br>rocksdb.max.manifest.file.size.bytes| This property specifies the maximum size (in bytes) of the MANIFEST data file for a specific store, after which it is rotated. The default value is defined by `stores-default.rocksdb.max.manifest.file.size.bytes`.|
|stores.**_store-name_**.<br>side.inputs|(none)|Samza applications with stores that are populated by a secondary data sources such as HDFS, but otherwise ready-only, can leverage side inputs. Stores configured with side inputs use the the source streams to bootstrap data in the absence of local copy thereby, reducing additional copy of the data in changelog. It is also recommended to enable host affinity feature when turning on side inputs to prevent bootstrapping of the data during container restarts. The value is a comma-separated list of streams.<br> Each stream is of the format `system-name.stream-name`. Additionally, applications should add the side inputs to job inputs (`task.inputs`) and configure side input processor (`stores.store-name.side.inputs.processor.factory`).
|stores.**_store-name_**.<br>side.inputs.processor.factory|(none)|The value is a fully-qualified name of a Java class that implements <a href="../api/javadocs/org/apache/samza/storage/SideInputProcessorFactory.html">SideInputProcessorFactory</a>. It is a required configuration for stores with side inputs (`stores.store-name.side.inputs`).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public class StorageConfig extends MapConfig {
public static final String STORE_RESTORE_FACTORIES = STORE_PREFIX + "%s." + RESTORE_FACTORIES_SUFFIX;
public static final String JOB_RESTORE_FACTORIES = STORE_PREFIX + RESTORE_FACTORIES_SUFFIX;
public static final List<String> DEFAULT_RESTORE_FACTORIES = ImmutableList.of(KAFKA_STATE_BACKEND_FACTORY);
public static final long DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1024L;

static final String DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE = "stores-default.rocksdb.max.manifest.file.size.bytes";
static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
static final long DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1);
Expand Down Expand Up @@ -263,6 +265,15 @@ public long getChangelogMinCompactionLagMs(String storeName) {
return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs());
}

/**
* Helper method to get the default RocksDB max manifest file size in bytes for ANY stores, which is default to 1GB.
* The default value for ANY stores can be configured by "stores-default.rocksdb.max.manifest.file.size.bytes",
* and the value for a specific store can be configured by "stores.store-name.rocksdb.max.manifest.file.size".
*/
public long getDefaultMaxManifestFileSizeBytes() {
return getLong(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE, DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES);
}

/**
* Helper method to check if there is any stores configured w/ a changelog
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,15 @@ public void testGetBackupManagers() {
assertEquals(Collections.emptyList(),
new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
}

@Test
public void testGetMaxManifestFileSize() {
// empty config, return default size, which is 1GB
assertEquals(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES,
new StorageConfig(new MapConfig()).getDefaultMaxManifestFileSizeBytes());

StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE), "1024")));
assertEquals(1024, storageConfig.getDefaultMaxManifestFileSizeBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public RocksDbKeyValueReader(String storeName, String dbPath, Config config) {
valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName).orElse(null), serializerConfig);

// get db options
Options options = RocksDbOptionsHelper.options(config, 1, new File(dbPath), StorageEngineFactory.StoreMode.ReadWrite);
Options options = RocksDbOptionsHelper.options(config,
1,
storageConfig.getDefaultMaxManifestFileSizeBytes(),
new File(dbPath),
StorageEngineFactory.StoreMode.ReadWrite
);

// open the db
RocksDB.loadLibrary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public class RocksDbOptionsHelper {
*/
private static final int DEFAULT_ROCKSDB_MAX_BACKGROUND_JOBS = 4;


public static Options options(Config storeConfig, int numTasksForContainer, File storeDir, StorageEngineFactory.StoreMode storeMode) {
public static Options options(Config storeConfig, int numTasksForContainer, long defaultMaxManifestFileSize,
File storeDir, StorageEngineFactory.StoreMode storeMode) {
Options options = new Options();

if (storeConfig.getBoolean(ROCKSDB_WAL_ENABLED, false)) {
Expand Down Expand Up @@ -143,10 +143,8 @@ public static Options options(Config storeConfig, int numTasksForContainer, File
options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L));
options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16));
// The default for rocksdb is 18446744073709551615, which is larger than java Long.MAX_VALUE. Hence setting it only if it's passed.
if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
}
// The default for rocksdb is 1GB (1024*1024*1024 bytes)
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE, defaultMaxManifestFileSize));
// use prepareForBulk load only when i. the store is being requested in BulkLoad mode
// and ii. the storeDirectory does not exist (fresh restore), because bulk load does not work seamlessly with
// existing stores : https://github.com/facebook/rocksdb/issues/2734
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,22 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
registry: MetricsRegistry,
jobContext: JobContext,
containerContext: ContainerContext, storeMode: StoreMode): KeyValueStore[Array[Byte], Array[Byte]] = {
val storageConfig = new StorageConfig(jobContext.getConfig)
val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true)
val isLoggedStore = new StorageConfig(jobContext.getConfig).getChangelogStream(storeName).isPresent
val isLoggedStore = storageConfig.getChangelogStream(storeName).isPresent
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size()
val defaultMaxManifestFileSize = storageConfig.getDefaultMaxManifestFileSizeBytes
rocksDbMetrics.newGauge("rocksdb.block-cache-size",
() => RocksDbOptionsHelper.getBlockCacheSize(storageConfigSubset, numTasksForContainer))

val rocksDbOptions = RocksDbOptionsHelper.options(storageConfigSubset, numTasksForContainer, storeDir, storeMode)
val rocksDbOptions = RocksDbOptionsHelper.options(
storageConfigSubset,
numTasksForContainer,
defaultMaxManifestFileSize,
storeDir,
storeMode
)
val rocksDbWriteOptions = new WriteOptions()

if (!storageConfigSubset.getBoolean(RocksDbOptionsHelper.ROCKSDB_WAL_ENABLED, false)) {
Expand Down
Loading