Skip to content

Commit

Permalink
[fix][functions] Fix netty.DnsResolverUtil compat issue on JDK9+ for …
Browse files Browse the repository at this point in the history
…the function Runtimes
  • Loading branch information
cbornet committed Jul 7, 2022
1 parent 4c958a9 commit 3e43cf8
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
return args;
}

public static boolean isJavaVersion9OrMore() {
return !System.getProperty("java.version").startsWith("1.");
}


public static List<String> getCmd(InstanceConfig instanceConfig,
String instanceFile,
Expand Down Expand Up @@ -320,6 +324,11 @@ public static List<String> getCmd(InstanceConfig instanceConfig,

args.add("-Dio.netty.tryReflectionSetAccessible=true");

// Needed for netty.DnsResolverUtil on JDK9+
if (isJavaVersion9OrMore()) {
args.add("--add-opens java.base/sun.net=ALL-UNNAMED");
}

if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
Expand All @@ -48,6 +49,7 @@
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -380,8 +382,13 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
}

private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
KubernetesRuntime container;
List<String> args;
try (MockedStatic<RuntimeUtils> runtimeUtils = Mockito.mockStatic(RuntimeUtils.class, Mockito.CALLS_REAL_METHODS)) {
runtimeUtils.when(RuntimeUtils::isJavaVersion9OrMore).thenReturn(true);
container = factory.createContainer(config, userJarFile, userJarFile, 30L);
args = container.getProcessArgs();
}

String classpath = javaInstanceJarFile;
String extraDepsEnv;
Expand All @@ -392,14 +399,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
totalArgs = 40;
portArg = 26;
metricsPortArg = 28;
totalArgs = 41;
portArg = 27;
metricsPortArg = 29;
} else {
extraDepsEnv = "";
portArg = 25;
metricsPortArg = 27;
totalArgs = 39;
portArg = 26;
metricsPortArg = 28;
totalArgs = 40;
}
if (secretsAttached) {
totalArgs += 4;
Expand Down Expand Up @@ -430,6 +437,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Dio.netty.tryReflectionSetAccessible=true -Xmx" + String.valueOf(RESOURCES.getRam())
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + jarLocation + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -283,28 +285,32 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir) throws Exce
}

private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webServiceUrl) throws Exception {
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
List<String> args;
try (MockedStatic<RuntimeUtils> runtimeUtils = Mockito.mockStatic(RuntimeUtils.class, Mockito.CALLS_REAL_METHODS)) {
runtimeUtils.when(RuntimeUtils::isJavaVersion9OrMore).thenReturn(true);
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
args = container.getProcessArgs();
}

String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
int metricsPortArg;
int totalArgCount = 42;
int totalArgCount = 43;
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 3;
}
if (null != depsDir) {
assertEquals(args.size(), totalArgCount);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
portArg = 25;
metricsPortArg = 27;
portArg = 26;
metricsPortArg = 28;
} else {
assertEquals(args.size(), totalArgCount-1);
extraDepsEnv = "";
portArg = 24;
metricsPortArg = 26;
portArg = 25;
metricsPortArg = 27;
}
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
portArg += 3;
Expand All @@ -321,6 +327,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " -Dio.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
+ config.getInstanceId() + " --function_id " + config.getFunctionId()
Expand Down

0 comments on commit 3e43cf8

Please sign in to comment.