diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 6188e90b47..afa453d5e3 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -321,7 +321,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream |stores.**_store-name_**.
rocksdb.keep.log.file.num|2|The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.| |stores.**_store-name_**.
rocksdb.metrics.list|(none)|A list of [RocksDB properties](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409) to expose as metrics (gauges).| |stores.**_store-name_**.
rocksdb.delete.obsolete.files.period.micros|21600000000|This property specifies the period in microseconds to delete obsolete files regardless of files removed during compaction. Allowed range is up to 9223372036854775807.| -|stores.**_store-name_**.
rocksdb.max.manifest.file.size|18446744073709551615|This property specifies the maximum size of the MANIFEST data file, after which it is rotated. Default value is also the maximum, making it practically unlimited: only one manifest file is used.| +|stores-default.
rocksdb.max.manifest.file.size.bytes|1073741824| This property specifies the default maximum size (in bytes) of the MANIFEST data file for **ANY** stores, after which it is rotated. The default value is 1GB. The value for a specific store can be configured by `stores.store-name.rocksdb.max.manifest.file.size`.| +|stores.**_store-name_**.
rocksdb.max.manifest.file.size|stores-default.
rocksdb.max.manifest.file.size.bytes| This property specifies the maximum size (in bytes) of the MANIFEST data file for a specific store, after which it is rotated. The default value is defined by `stores-default.rocksdb.max.manifest.file.size.bytes`.| |stores.**_store-name_**.
side.inputs|(none)|Samza applications with stores that are populated by a secondary data sources such as HDFS, but otherwise ready-only, can leverage side inputs. Stores configured with side inputs use the the source streams to bootstrap data in the absence of local copy thereby, reducing additional copy of the data in changelog. It is also recommended to enable host affinity feature when turning on side inputs to prevent bootstrapping of the data during container restarts. The value is a comma-separated list of streams.
Each stream is of the format `system-name.stream-name`. Additionally, applications should add the side inputs to job inputs (`task.inputs`) and configure side input processor (`stores.store-name.side.inputs.processor.factory`). |stores.**_store-name_**.
side.inputs.processor.factory|(none)|The value is a fully-qualified name of a Java class that implements SideInputProcessorFactory. It is a required configuration for stores with side inputs (`stores.store-name.side.inputs`). diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index 0bb993f197..df96ac6298 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -79,7 +79,9 @@ public class StorageConfig extends MapConfig { public static final String STORE_RESTORE_FACTORIES = STORE_PREFIX + "%s." + RESTORE_FACTORIES_SUFFIX; public static final String JOB_RESTORE_FACTORIES = STORE_PREFIX + RESTORE_FACTORIES_SUFFIX; public static final List DEFAULT_RESTORE_FACTORIES = ImmutableList.of(KAFKA_STATE_BACKEND_FACTORY); + public static final long DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1024L; + static final String DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE = "stores-default.rocksdb.max.manifest.file.size.bytes"; static final String CHANGELOG_SYSTEM = "job.changelog.system"; static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms"; static final long DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); @@ -263,6 +265,15 @@ public long getChangelogMinCompactionLagMs(String storeName) { return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs()); } + /** + * Helper method to get the default RocksDB max manifest file size in bytes for ANY stores, which is default to 1GB. + * The default value for ANY stores can be configured by "stores-default.rocksdb.max.manifest.file.size.bytes", + * and the value for a specific store can be configured by "stores.store-name.rocksdb.max.manifest.file.size". + */ + public long getDefaultMaxManifestFileSizeBytes() { + return getLong(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE, DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES); + } + /** * Helper method to check if there is any stores configured w/ a changelog */ diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java index 953f6efe6b..c9f3f01f6a 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java @@ -590,4 +590,15 @@ public void testGetBackupManagers() { assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2)); } + + @Test + public void testGetMaxManifestFileSize() { + // empty config, return default size, which is 1GB + assertEquals(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES, + new StorageConfig(new MapConfig()).getDefaultMaxManifestFileSizeBytes()); + + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE), "1024"))); + assertEquals(1024, storageConfig.getDefaultMaxManifestFileSizeBytes()); + } } diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java index 987a60631f..119b59006f 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java @@ -61,7 +61,12 @@ public RocksDbKeyValueReader(String storeName, String dbPath, Config config) { valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName).orElse(null), serializerConfig); // get db options - Options options = RocksDbOptionsHelper.options(config, 1, new File(dbPath), StorageEngineFactory.StoreMode.ReadWrite); + Options options = RocksDbOptionsHelper.options(config, + 1, + storageConfig.getDefaultMaxManifestFileSizeBytes(), + new File(dbPath), + StorageEngineFactory.StoreMode.ReadWrite + ); // open the db RocksDB.loadLibrary(); diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java index 869adb8574..cc601840eb 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java @@ -86,8 +86,8 @@ public class RocksDbOptionsHelper { */ private static final int DEFAULT_ROCKSDB_MAX_BACKGROUND_JOBS = 4; - - public static Options options(Config storeConfig, int numTasksForContainer, File storeDir, StorageEngineFactory.StoreMode storeMode) { + public static Options options(Config storeConfig, int numTasksForContainer, long defaultMaxManifestFileSize, + File storeDir, StorageEngineFactory.StoreMode storeMode) { Options options = new Options(); if (storeConfig.getBoolean(ROCKSDB_WAL_ENABLED, false)) { @@ -143,10 +143,8 @@ public static Options options(Config storeConfig, int numTasksForContainer, File options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L)); options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1)); options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16)); - // The default for rocksdb is 18446744073709551615, which is larger than java Long.MAX_VALUE. Hence setting it only if it's passed. - if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) { - options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE)); - } + // The default for rocksdb is 1GB (1024*1024*1024 bytes) + options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE, defaultMaxManifestFileSize)); // use prepareForBulk load only when i. the store is being requested in BulkLoad mode // and ii. the storeDirectory does not exist (fresh restore), because bulk load does not work seamlessly with // existing stores : https://github.com/facebook/rocksdb/issues/2734 diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index afc289ad75..dce443b8c6 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -42,14 +42,22 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi registry: MetricsRegistry, jobContext: JobContext, containerContext: ContainerContext, storeMode: StoreMode): KeyValueStore[Array[Byte], Array[Byte]] = { + val storageConfig = new StorageConfig(jobContext.getConfig) val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true) - val isLoggedStore = new StorageConfig(jobContext.getConfig).getChangelogStream(storeName).isPresent + val isLoggedStore = storageConfig.getChangelogStream(storeName).isPresent val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size() + val defaultMaxManifestFileSize = storageConfig.getDefaultMaxManifestFileSizeBytes rocksDbMetrics.newGauge("rocksdb.block-cache-size", () => RocksDbOptionsHelper.getBlockCacheSize(storageConfigSubset, numTasksForContainer)) - val rocksDbOptions = RocksDbOptionsHelper.options(storageConfigSubset, numTasksForContainer, storeDir, storeMode) + val rocksDbOptions = RocksDbOptionsHelper.options( + storageConfigSubset, + numTasksForContainer, + defaultMaxManifestFileSize, + storeDir, + storeMode + ) val rocksDbWriteOptions = new WriteOptions() if (!storageConfigSubset.getBoolean(RocksDbOptionsHelper.ROCKSDB_WAL_ENABLED, false)) {