diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index d496d4edd340..33c7e1e8497a 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -178,6 +178,11 @@ public interface Configs { */ String getTemporalHost(); + /** + * Define the number of retention days for the temporal history + */ + int getTemporalRetentionInDays(); + /** * Define the url where the Airbyte Server is hosted at. Airbyte services use this information. * Manipulates the `INTERNAL_API_HOST` variable. diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 353b7c300177..fe80f65856ad 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -73,6 +73,7 @@ public class EnvConfigs implements Configs { public static final String MAX_SYNC_WORKERS = "MAX_SYNC_WORKERS"; private static final String TEMPORAL_HOST = "TEMPORAL_HOST"; private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS"; + private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS"; public static final String JOB_KUBE_NAMESPACE = "JOB_KUBE_NAMESPACE"; private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS"; public static final String JOB_MAIN_CONTAINER_CPU_REQUEST = "JOB_MAIN_CONTAINER_CPU_REQUEST"; @@ -166,6 +167,8 @@ public class EnvConfigs implements Configs { public static final String DEFAULT_NETWORK = "host"; + public static final int DEFAULT_TEMPORAL_HISTORY_RETENTION_IN_DAYS = 30; + private final Function getEnv; private final Supplier> getAllEnvKeys; private final LogConfigs logConfigs; @@ -385,6 +388,11 @@ public String getTemporalHost() { return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233"); } + @Override + public int getTemporalRetentionInDays() { + return getEnvOrDefault(TEMPORAL_HISTORY_RETENTION_IN_DAYS, DEFAULT_TEMPORAL_HISTORY_RETENTION_IN_DAYS); + } + @Override public String getAirbyteApiHost() { return getEnsureEnv(INTERNAL_API_HOST).split(":")[0]; @@ -815,6 +823,10 @@ public long getEnvOrDefault(final String key, final long defaultValue) { return getEnvOrDefault(key, defaultValue, Long::parseLong, false); } + public int getEnvOrDefault(final String key, final int defaultValue) { + return getEnvOrDefault(key, defaultValue, Integer::parseInt, false); + } + public boolean getEnvOrDefault(final String key, final boolean defaultValue) { return getEnvOrDefault(key, defaultValue, Boolean::parseBoolean); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index ad8e8021ae6d..a23405cad924 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -71,7 +71,7 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo public static final String DEFAULT_NAMESPACE = "default"; - private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(7); + private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(configs.getTemporalRetentionInDays()); private static final String HUMAN_READABLE_WORKFLOW_EXECUTION_TTL = DurationFormatUtils.formatDurationWords(WORKFLOW_EXECUTION_TTL.toMillis(), true, true);