Skip to content

Commit

Permalink
[improve][fn] Add configuration for connector & functions package url…
Browse files Browse the repository at this point in the history
… sources (#22184)

(cherry picked from commit 207335a)
(cherry picked from commit b107387)
  • Loading branch information
lhotari committed Mar 4, 2024
1 parent 7256bb7 commit b83dab5
Show file tree
Hide file tree
Showing 19 changed files with 396 additions and 239 deletions.
8 changes: 8 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,15 @@ saslJaasServerRoleTokenSignerSecretPath:
########################

connectorsDirectory: ./connectors
# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation
enableReferencingConnectorDirectoryFiles: true
# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls
additionalEnabledConnectorUrlPatterns: []
functionsDirectory: ./functions
# Whether to enable referencing functions directory files by file url in functions creation
enableReferencingFunctionsDirectoryFiles: true
# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls
additionalEnabledFunctionsUrlPatterns: []

# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -265,6 +266,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());

List<String> urlPatterns =
Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -260,6 +260,10 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns = Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -71,6 +73,7 @@ public class PulsarFunctionTlsTest {
protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
protected PulsarService leaderPulsar;
protected PulsarAdmin leaderAdmin;
protected WorkerService[] fnWorkerServices = new WorkerService[BROKER_COUNT];
protected String testCluster = "my-cluster";
protected String testTenant = "my-tenant";
protected String testNamespace = testTenant + "/my-ns";
Expand Down Expand Up @@ -136,12 +139,18 @@ void setup() throws Exception {
workerConfig.setBrokerClientAuthenticationEnabled(true);
workerConfig.setTlsEnabled(true);
workerConfig.setUseTls(true);
WorkerService fnWorkerService = WorkerServiceLoader.load(workerConfig);
File packagePath = new File(
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile();
List<String> urlPatterns =
Arrays.asList(packagePath.toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);

configurations[i] = config;

pulsarServices[i] = new PulsarService(
config, workerConfig, Optional.of(fnWorkerService), code -> {});
config, workerConfig, Optional.of(fnWorkerServices[i]), code -> {});
pulsarServices[i].start();

// Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily.
Expand Down Expand Up @@ -180,6 +189,9 @@ void tearDown() throws Exception {
if (pulsarAdmins[i] != null) {
pulsarAdmins[i].close();
}
if (fnWorkerServices[i] != null) {
fnWorkerServices[i].stop();
}
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
Expand All @@ -34,10 +34,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand Down Expand Up @@ -70,8 +70,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

public abstract class AbstractPulsarE2ETest {

public static final Logger log = LoggerFactory.getLogger(AbstractPulsarE2ETest.class);
Expand Down Expand Up @@ -288,6 +286,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns =
Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) "
+ "creation. Default is true."
)
private Boolean enableReferencingConnectorDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledConnectorUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Enables extended validation for connector config with fine-grain annotation based validation "
Expand All @@ -255,6 +267,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Whether to enable referencing functions directory files by file url in functions creation. "
+ "Default is true."
)
private Boolean enableReferencingFunctionsDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledFunctionsUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The Pulsar topic used for storing function metadata"
Expand Down
Loading

0 comments on commit b83dab5

Please sign in to comment.