Skip to content

Commit

Permalink
[fix][fn] Enable optimized Netty direct byte buffer support for Pulsa…
Browse files Browse the repository at this point in the history
…r Function runtimes (#22910)

(cherry picked from commit f3d4d5a)
  • Loading branch information
lhotari committed Jun 19, 2024
1 parent b451530 commit 20c3413
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,26 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
instanceConfig.getFunctionDetails().getName(),
shardId));

// Needed for optimized Netty direct byte buffer support
args.add("-Dio.netty.tryReflectionSetAccessible=true");
// Handle possible shaded Netty versions
args.add("-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true");
args.add("-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true");

if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) {
// Needed for optimized Netty direct byte buffer support
args.add("--add-opens");
args.add("java.base/java.nio=ALL-UNNAMED");
args.add("--add-opens");
args.add("java.base/jdk.internal.misc=ALL-UNNAMED");
}

// Needed for netty.DnsResolverUtil on JDK9+
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Needed for optimized checksum calculation when com.scurrilous.circe.checksum.Java9IntHash
// is used. That gets used when the native library libcirce-checksum is not available or cannot
// be loaded.
args.add("--add-opens");
args.add("java.base/sun.net=ALL-UNNAMED");
args.add("java.base/java.util.zip=ALL-UNNAMED");
}

if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
totalArgs = 46;
portArg = 33;
metricsPortArg = 35;
totalArgs = 52;
portArg = 39;
metricsPortArg = 41;
} else {
extraDepsEnv = "";
portArg = 32;
metricsPortArg = 34;
totalArgs = 45;
portArg = 38;
metricsPortArg = 40;
totalArgs = 51;
}
if (secretsAttached) {
totalArgs += 4;
Expand Down Expand Up @@ -479,7 +479,11 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Dio.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+ " -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/java.nio=ALL-UNNAMED"
+ " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+ " --add-opens java.base/java.util.zip=ALL-UNNAMED"
+ " -Xmx" + RESOURCES.getRam()
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + jarLocation
Expand Down Expand Up @@ -1306,7 +1310,7 @@ private void assertMetricsPortConfigured(Map<String, Object> functionRuntimeFact
.contains("--metrics_port 0"));
}
}

@Test
public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
Expand All @@ -1315,22 +1319,22 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc

CoreV1Api coreApi = mock(CoreV1Api.class);
AppsV1Api appsApi = mock(AppsV1Api.class);

Call successfulCall = mock(Call.class);
Response okResponse = mock(Response.class);
when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
when(okResponse.isSuccessful()).thenReturn(true);
when(okResponse.message()).thenReturn("");
when(successfulCall.execute()).thenReturn(okResponse);

final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", "c-tenant", "c-ns", "c-fn");

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
factory.setCoreClient(coreApi);
factory.setAppsClient(appsApi);

ArgumentMatcher<String> hasTranslatedFunctionName = (String t) -> t.startsWith(expectedFunctionNamePrefix);

when(appsApi.deleteNamespacedStatefulSetCall(
argThat(hasTranslatedFunctionName),
anyString(), isNull(), isNull(), anyInt(), isNull(), anyString(), any(), isNull())).thenReturn(successfulCall);
Expand All @@ -1342,14 +1346,14 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc

V1PodList podList = mock(V1PodList.class);
when(podList.getItems()).thenReturn(Collections.emptyList());

String expectedLabels = String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");

when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(),
eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull())).thenReturn(podList);
KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);
KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);
kr.deleteStatefulSet();

verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(),
eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,21 +297,21 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS
String extraDepsEnv;
int portArg;
int metricsPortArg;
int totalArgCount = 48;
int totalArgCount = 54;
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 3;
}
if (null != depsDir) {
assertEquals(args.size(), totalArgCount);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
portArg = 31;
metricsPortArg = 33;
portArg = 37;
metricsPortArg = 39;
} else {
assertEquals(args.size(), totalArgCount-1);
extraDepsEnv = "";
portArg = 30;
metricsPortArg = 32;
portArg = 36;
metricsPortArg = 38;
}
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
portArg += 3;
Expand All @@ -328,7 +328,11 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " -Dio.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+ " -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/java.nio=ALL-UNNAMED"
+ " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+ " --add-opens java.base/java.util.zip=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile
+ " --transform_function_jar " + userJarFile
Expand Down

0 comments on commit 20c3413

Please sign in to comment.