Skip to content

Commit

Permalink
k8s runtime: force deletion to avoid hung function worker during conn…
Browse files Browse the repository at this point in the history
…ector restart (apache#12504)

(cherry picked from commit a3f6aba)
  • Loading branch information
dlg99 authored and nicoloboschi committed Dec 1, 2021
1 parent 0f43101 commit 04ef5ac
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 4 deletions.
4 changes: 4 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ functionRuntimeFactoryConfigs:
# extraFunctionDependenciesDir:
# # Additional memory padding added on top of the memory requested by the function per on a per instance basis
# percentMemoryPadding: 10
# # The duration in seconds before the StatefulSet deleted on function stop/restart.
# # Value must be non-negative integer. The value zero indicates delete immediately.
# # Default is 5 seconds.
# gracePeriodSeconds: 5
# # The ratio cpu request and cpu limit to be set for a function/source/sink.
# # The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
# cpuOverCommitRatio: 1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class KubernetesRuntime implements Runtime {
private int percentMemoryPadding;
private double cpuOverCommitRatio;
private double memoryOverCommitRatio;
private int gracePeriodSeconds;
private final Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider;
private final AuthenticationConfig authConfig;
private Integer grpcPort;
Expand Down Expand Up @@ -185,6 +186,7 @@ public class KubernetesRuntime implements Runtime {
int percentMemoryPadding,
double cpuOverCommitRatio,
double memoryOverCommitRatio,
int gracePeriodSeconds,
Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider,
boolean authenticationEnabled,
Integer grpcPort,
Expand Down Expand Up @@ -212,6 +214,7 @@ public class KubernetesRuntime implements Runtime {
this.percentMemoryPadding = percentMemoryPadding;
this.cpuOverCommitRatio = cpuOverCommitRatio;
this.memoryOverCommitRatio = memoryOverCommitRatio;
this.gracePeriodSeconds = gracePeriodSeconds;
this.authenticationEnabled = authenticationEnabled;
this.manifestCustomizer = manifestCustomizer;
this.functionInstanceClassPath = functinoInstanceClassPath;
Expand Down Expand Up @@ -567,7 +570,7 @@ private void submitStatefulSet() throws Exception {
public void deleteStatefulSet() throws InterruptedException {
String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
final V1DeleteOptions options = new V1DeleteOptions();
options.setGracePeriodSeconds(5L);
options.setGracePeriodSeconds((long)gracePeriodSeconds);
options.setPropagationPolicy("Foreground");

String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Expand All @@ -583,8 +586,8 @@ public void deleteStatefulSet() throws InterruptedException {
response = appsClient.deleteNamespacedStatefulSetCall(
statefulSetName,
jobNamespace, null, null,
5, null, "Foreground",
null, null)
gracePeriodSeconds, null, "Foreground",
options, null)
.execute();
} catch (ApiException e) {
// if already deleted
Expand Down Expand Up @@ -735,7 +738,7 @@ public void deleteService() throws InterruptedException {
serviceName,
jobNamespace, null, null,
0, null,
"Foreground", null, null).execute();
"Foreground", options, null).execute();
} catch (ApiException e) {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private String narExtractionDirectory;
private String functionInstanceClassPath;
private String downloadDirectory;
private int gracePeriodSeconds;

@ToString.Exclude
@EqualsAndHashCode.Exclude
Expand Down Expand Up @@ -195,6 +196,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
this.percentMemoryPadding = factoryConfig.getPercentMemoryPadding();
this.cpuOverCommitRatio = factoryConfig.getCpuOverCommitRatio();
this.memoryOverCommitRatio = factoryConfig.getMemoryOverCommitRatio();
this.gracePeriodSeconds = factoryConfig.getGracePeriodSeconds();
this.pulsarServiceUrl = StringUtils.isEmpty(factoryConfig.getPulsarServiceUrl())
? workerConfig.getPulsarServiceUrl() : factoryConfig.getPulsarServiceUrl();
this.pulsarAdminUrl = StringUtils.isEmpty(factoryConfig.getPulsarAdminUrl())
Expand Down Expand Up @@ -310,6 +312,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
percentMemoryPadding,
cpuOverCommitRatio,
memoryOverCommitRatio,
gracePeriodSeconds,
authProvider,
authenticationEnabled,
grpcPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,11 @@ public class KubernetesRuntimeFactoryConfig {
doc = "The classpath where function instance files stored"
)
private String functionInstanceClassPath = "";
@FieldContext(
doc = "The duration in seconds before the StatefulSet deleted on function stop/restart. " +
"Value must be non-negative integer. The value zero indicates delete immediately. " +
"Default is 5 seconds."
)
protected int gracePeriodSeconds = 5;

}
4 changes: 4 additions & 0 deletions site2/docs/functions-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ kubernetesContainerFactory:
extraFunctionDependenciesDir:
# Additional memory padding added on top of the memory requested by the function per on a per instance basis
percentMemoryPadding: 10
# The duration (in seconds) before the StatefulSet is deleted after a function stops or restarts.
# Value must be a non-negative integer. 0 indicates the StatefulSet is deleted immediately.
# Default is 5 seconds.
gracePeriodSeconds: 5
```
If you run functions worker embedded in a broker on Kubernetes, you can use the default settings.
Expand Down

0 comments on commit 04ef5ac

Please sign in to comment.