Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application Plugin Interface #7473

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.application;

import com.facebook.presto.spi.application.Application;
import com.facebook.presto.spi.application.ApplicationFactory;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.google.inject.Injector;
import io.airlift.log.Logger;

import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class ApplicationManager
{
private static final Logger log = Logger.get(ApplicationManager.class);

@GuardedBy("this")
private final Map<String, ApplicationFactory> applicationFactories = new ConcurrentHashMap<>();
@GuardedBy("this")
private final Map<String, Application> applications = new ConcurrentHashMap<>();

private final AtomicBoolean stopped = new AtomicBoolean();

public synchronized void addApplicationFactory(ApplicationFactory applicationFactory)
{
requireNonNull(applicationFactory, "applicationFactory is null");

if (applicationFactories.putIfAbsent(applicationFactory.getName(), applicationFactory) != null) {
throw new IllegalArgumentException(format("Application '%s' is already registered", applicationFactory.getName()));
}
}

@PreDestroy
public void stop()
{
if (stopped.getAndSet(true)) {
return;
}

for (Map.Entry<String, Application> entry : applications.entrySet()) {
Application application = entry.getValue();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(application.getClass().getClassLoader())) {
application.shutdown();
}
catch (Throwable t) {
log.error(t, "Error shutting down application: %s", entry.getKey());
}
}
}

protected synchronized void setup(String applicationName, ApplicationFactory applicationFactory, Map<String, String> properties)
{
checkState(!stopped.get(), "ApplicationManager is stopped");
checkState(!applications.containsKey(applicationName), "Application %s already exists", applicationName);
try {
applications.put(applicationName, applicationFactory.create(properties));
}
catch (Throwable t) {
log.error(t, "Error initializing application: %s", applicationFactory.getName());
}
}

public void setup(String applicationName, Map<String, String> properties)
{
checkState(!stopped.get(), "ApplicationManager is stopped");
ApplicationFactory applicationFactory = applicationFactories.get(applicationName);
checkArgument(applicationFactory != null, "No factory for application %s", applicationName);
setup(applicationName, applicationFactory, properties);
}

protected void start(Injector injector)
{
for (Map.Entry<String, Application> entry : applications.entrySet()) {
Application application = entry.getValue();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(application.getClass().getClassLoader())) {
application.run(injector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing Injector here won't work, since plugins can't see Presto's Guice from inside their class loader, by design. We intentionally do not expose the internal details of Presto to plugins. Anything needed by a plugin service should be explicitly part of the SPI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about setting couple context groups as different ApplicationContexts in SPI just like ConnectorContext and initialize them in ApplicationManager?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a "context group"? I feel like I'm missing a lot of context around the purpose of this pull request. Can you explain more about the use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I suppose to bind UI on the port other than the API port here: #7106. In that PR, I just add a new module inside the PrestoServer and rebind the instance in the initialized injector to another Bootstrap, which looks hacky. I feel it would be reasonable adding an application plugin which runs on top of presto-main, and potentially passing some context instances from the initialized injector in some way. I agree that it shouldn't pass the whole injector and it shouldn't expose all internal instance to plugins.

}
catch (Throwable t) {
log.error(t, "Error starting application: %s", entry.getKey());
}
}
}

public void startApplications(Injector injector)
{
requireNonNull(injector, "injector is null");
checkState(!stopped.get(), "ApplicationManager is stopped");
start(injector);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.application;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

public class ApplicationModule
implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(ApplicationManager.class).in(Scopes.SINGLETON);
binder.bind(StaticApplicationStore.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.application;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;

import javax.inject.Inject;

import java.io.File;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.fromProperties;
import static java.util.Objects.requireNonNull;

public class StaticApplicationStore
{
private static final Logger log = Logger.get(StaticApplicationStore.class);
private static final File APPLICATION_CONFIGURATION_DIR = new File("etc/application/");
private final ApplicationManager applicationManager;
private final AtomicBoolean applicationsLoading = new AtomicBoolean();
private final AtomicBoolean applicationsLoaded = new AtomicBoolean();

@Inject
public StaticApplicationStore(ApplicationManager applicationManager)
{
this.applicationManager = requireNonNull(applicationManager, "applicationManager is null");
}

public boolean areApplicationsLoaded()
{
return applicationsLoaded.get();
}

public void loadApplications()
throws Exception
{
if (!applicationsLoading.compareAndSet(false, true)) {
return;
}

for (File file : listFiles(APPLICATION_CONFIGURATION_DIR)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
loadApplication(file);
}
}

applicationsLoaded.set(true);
}

private void loadApplication(File file)
throws Exception
{
log.info("-- Loading application %s --", file);
Map<String, String> properties = new HashMap<>(loadProperties(file));

String applicationName = properties.remove("application.name");
checkState(applicationName != null, "Application configuration %s does not contain application.name", file.getAbsoluteFile());

applicationManager.setup(applicationName, ImmutableMap.copyOf(properties));
log.info("-- Added application %s --", applicationName);
}

private static List<File> listFiles(File installedPluginsDir)
{
if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
File[] files = installedPluginsDir.listFiles();
if (files != null) {
return ImmutableList.copyOf(files);
}
}
return ImmutableList.of();
}

private static Map<String, String> loadProperties(File file)
throws Exception
{
requireNonNull(file, "file is null");

Properties properties = new Properties();
try (FileInputStream in = new FileInputStream(file)) {
properties.load(in);
}
return fromProperties(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package com.facebook.presto.server;

import com.facebook.presto.application.ApplicationManager;
import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.application.ApplicationFactory;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.ConnectorFactory;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class PluginManager

private final ConnectorManager connectorManager;
private final Metadata metadata;
private final ApplicationManager applicationManager;
private final ResourceGroupManager resourceGroupManager;
private final AccessControlManager accessControlManager;
private final EventListenerManager eventListenerManager;
Expand All @@ -91,6 +94,7 @@ public PluginManager(
PluginManagerConfig config,
ConnectorManager connectorManager,
Metadata metadata,
ApplicationManager applicationManager,
ResourceGroupManager resourceGroupManager,
AccessControlManager accessControlManager,
EventListenerManager eventListenerManager,
Expand All @@ -112,6 +116,7 @@ public PluginManager(

this.connectorManager = requireNonNull(connectorManager, "connectorManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.applicationManager = requireNonNull(applicationManager, "applicationManager is null");
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
this.accessControlManager = requireNonNull(accessControlManager, "accessControlManager is null");
this.eventListenerManager = requireNonNull(eventListenerManager, "eventListenerManager is null");
Expand Down Expand Up @@ -209,6 +214,11 @@ public void installPlugin(Plugin plugin)
log.info("Registering event listener %s", eventListenerFactory.getName());
eventListenerManager.addEventListenerFactory(eventListenerFactory);
}

for (ApplicationFactory applicationFactory : plugin.getApplicationFactories()) {
log.info("Registering application %s", applicationFactory.getName());
applicationManager.addApplicationFactory(applicationFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.server;

import com.facebook.presto.application.ApplicationManager;
import com.facebook.presto.application.ApplicationModule;
import com.facebook.presto.application.StaticApplicationStore;
import com.facebook.presto.discovery.EmbeddedDiscoveryModule;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.eventlistener.EventListenerModule;
Expand Down Expand Up @@ -105,6 +108,7 @@ public void run()
new ServerSecurityModule(),
new AccessControlModule(),
new EventListenerModule(),
new ApplicationModule(),
new ServerMainModule(sqlParserOptions),
new GracefulShutdownModule());

Expand All @@ -119,6 +123,9 @@ public void run()

injector.getInstance(StaticCatalogStore.class).loadCatalogs();

injector.getInstance(StaticApplicationStore.class).loadApplications();
injector.getInstance(ApplicationManager.class).startApplications(injector);

// TODO: remove this huge hack
updateConnectorIds(
injector.getInstance(Announcer.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.spi;

import com.facebook.presto.spi.application.ApplicationFactory;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.connector.ConnectorFactory;
Expand Down Expand Up @@ -68,4 +69,9 @@ default Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfi
{
return emptyList();
}

default Iterable<ApplicationFactory> getApplicationFactories()
{
return emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.spi.application;

public interface Application
extends Runnable
{
default void run(Object context)
{
run();
}

default void shutdown()
{
}
}
Loading