From 20c3413975047eaefd19668732e0a4ccd7e48f85 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 17 Jun 2024 09:26:28 +0300 Subject: [PATCH] [fix][fn] Enable optimized Netty direct byte buffer support for Pulsar Function runtimes (#22910) (cherry picked from commit f3d4d5ac0442eed2b538b8587186cdc0b8df9987) --- .../functions/runtime/RuntimeUtils.java | 18 ++++++++-- .../kubernetes/KubernetesRuntimeTest.java | 36 ++++++++++--------- .../runtime/process/ProcessRuntimeTest.java | 16 +++++---- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 0214b18fb2326..78347948688dd 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -361,12 +361,26 @@ public static List getCmd(InstanceConfig instanceConfig, instanceConfig.getFunctionDetails().getName(), shardId)); + // Needed for optimized Netty direct byte buffer support args.add("-Dio.netty.tryReflectionSetAccessible=true"); + // Handle possible shaded Netty versions + args.add("-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"); + args.add("-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"); + + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) { + // Needed for optimized Netty direct byte buffer support + args.add("--add-opens"); + args.add("java.base/java.nio=ALL-UNNAMED"); + args.add("--add-opens"); + args.add("java.base/jdk.internal.misc=ALL-UNNAMED"); + } - // Needed for netty.DnsResolverUtil on JDK9+ if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + // Needed for optimized checksum calculation when com.scurrilous.circe.checksum.Java9IntHash + // is used. That gets used when the native library libcirce-checksum is not available or cannot + // be loaded. args.add("--add-opens"); - args.add("java.base/sun.net=ALL-UNNAMED"); + args.add("java.base/java.util.zip=ALL-UNNAMED"); } if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 02f3c0d23fb17..6ed9849412910 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -441,14 +441,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s if (null != depsDir) { extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - totalArgs = 46; - portArg = 33; - metricsPortArg = 35; + totalArgs = 52; + portArg = 39; + metricsPortArg = 41; } else { extraDepsEnv = ""; - portArg = 32; - metricsPortArg = 34; - totalArgs = 45; + portArg = 38; + metricsPortArg = 40; + totalArgs = 51; } if (secretsAttached) { totalArgs += 4; @@ -479,7 +479,11 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID" + " -Dio.netty.tryReflectionSetAccessible=true" - + " --add-opens java.base/sun.net=ALL-UNNAMED" + + " -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true" + + " -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true" + + " --add-opens java.base/java.nio=ALL-UNNAMED" + + " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED" + + " --add-opens java.base/java.util.zip=ALL-UNNAMED" + " -Xmx" + RESOURCES.getRam() + " org.apache.pulsar.functions.instance.JavaInstanceMain" + " --jar " + jarLocation @@ -1306,7 +1310,7 @@ private void assertMetricsPortConfigured(Map functionRuntimeFact .contains("--metrics_port 0")); } } - + @Test public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exception { InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); @@ -1315,22 +1319,22 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc CoreV1Api coreApi = mock(CoreV1Api.class); AppsV1Api appsApi = mock(AppsV1Api.class); - + Call successfulCall = mock(Call.class); Response okResponse = mock(Response.class); when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK); when(okResponse.isSuccessful()).thenReturn(true); when(okResponse.message()).thenReturn(""); when(successfulCall.execute()).thenReturn(okResponse); - + final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", "c-tenant", "c-ns", "c-fn"); - + factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); factory.setCoreClient(coreApi); factory.setAppsClient(appsApi); ArgumentMatcher hasTranslatedFunctionName = (String t) -> t.startsWith(expectedFunctionNamePrefix); - + when(appsApi.deleteNamespacedStatefulSetCall( argThat(hasTranslatedFunctionName), anyString(), isNull(), isNull(), anyInt(), isNull(), anyString(), any(), isNull())).thenReturn(successfulCall); @@ -1342,14 +1346,14 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc V1PodList podList = mock(V1PodList.class); when(podList.getItems()).thenReturn(Collections.emptyList()); - + String expectedLabels = String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn"); - + when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(), eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull())).thenReturn(podList); - KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE); + KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE); kr.deleteStatefulSet(); - + verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(), eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull()); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index f63f24dc25624..365704ea0b4ed 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -297,7 +297,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS String extraDepsEnv; int portArg; int metricsPortArg; - int totalArgCount = 48; + int totalArgCount = 54; if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) { totalArgCount += 3; } @@ -305,13 +305,13 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS assertEquals(args.size(), totalArgCount); extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - portArg = 31; - metricsPortArg = 33; + portArg = 37; + metricsPortArg = 39; } else { assertEquals(args.size(), totalArgCount-1); extraDepsEnv = ""; - portArg = 30; - metricsPortArg = 32; + portArg = 36; + metricsPortArg = 38; } if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) { portArg += 3; @@ -328,7 +328,11 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() + " -Dio.netty.tryReflectionSetAccessible=true" - + " --add-opens java.base/sun.net=ALL-UNNAMED" + + " -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true" + + " -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true" + + " --add-opens java.base/java.nio=ALL-UNNAMED" + + " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED" + + " --add-opens java.base/java.util.zip=ALL-UNNAMED" + " org.apache.pulsar.functions.instance.JavaInstanceMain" + " --jar " + userJarFile + " --transform_function_jar " + userJarFile