diff --git a/uberfire-archetype/uberfire-project-archetype/src/main/resources/archetype-resources/__rootArtifactId__-parent-with-dependencies/pom.xml b/uberfire-archetype/uberfire-project-archetype/src/main/resources/archetype-resources/__rootArtifactId__-parent-with-dependencies/pom.xml index 94c11216de..64806e663c 100644 --- a/uberfire-archetype/uberfire-project-archetype/src/main/resources/archetype-resources/__rootArtifactId__-parent-with-dependencies/pom.xml +++ b/uberfire-archetype/uberfire-project-archetype/src/main/resources/archetype-resources/__rootArtifactId__-parent-with-dependencies/pom.xml @@ -62,7 +62,6 @@ ${version.org.easytesting.fest} ${version.org.hamcrest} - ${version.org.apache.helix} ${version.org.eclipse.jgit} ${version.com.jcraft} @@ -144,18 +143,6 @@ \${version.com.jcraft} - - org.apache.helix - helix-core - \${version.org.apache.helix} - - - jline - jline - - - - org.ocpsoft.prettytime diff --git a/uberfire-backend/uberfire-backend-cdi/src/main/java/org/uberfire/backend/server/cdi/SystemConfigProducer.java b/uberfire-backend/uberfire-backend-cdi/src/main/java/org/uberfire/backend/server/cdi/SystemConfigProducer.java index 234b241a4a..7cb844eac2 100644 --- a/uberfire-backend/uberfire-backend-cdi/src/main/java/org/uberfire/backend/server/cdi/SystemConfigProducer.java +++ b/uberfire-backend/uberfire-backend-cdi/src/main/java/org/uberfire/backend/server/cdi/SystemConfigProducer.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.context.spi.CreationalContext; import javax.enterprise.event.Observes; @@ -49,8 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.uberfire.commons.cluster.ClusterServiceFactory; -import org.uberfire.commons.concurrent.Unmanaged; import org.uberfire.commons.lifecycle.PriorityDisposableRegistry; import org.uberfire.commons.services.cdi.Startable; import org.uberfire.commons.services.cdi.Startup; @@ -58,7 +55,6 @@ import org.uberfire.commons.services.cdi.Veto; import org.uberfire.io.IOService; import org.uberfire.io.impl.IOServiceNio2WrapperImpl; -import org.uberfire.io.impl.cluster.IOServiceClusterImpl; import org.uberfire.java.nio.IOException; import org.uberfire.java.nio.base.FileSystemState; import org.uberfire.java.nio.file.FileStore; @@ -359,28 +355,7 @@ public boolean isNullable() { @Override public IOService create(CreationalContext ctx) { - final Bean clusterFactoryBean = (Bean) bm.getBeans("clusterServiceFactory").iterator().next(); - final CreationalContext _ctx = bm.createCreationalContext(clusterFactoryBean); - final ClusterServiceFactory clusterServiceFactory = (ClusterServiceFactory) bm.getReference(clusterFactoryBean, - ClusterServiceFactory.class, - _ctx); - - final ExecutorService executorService = getBean(bm, - ExecutorService.class, - new AnnotationLiteral() { - }); - - final IOService result; - - if (clusterServiceFactory == null) { - result = new IOServiceNio2WrapperImpl(); - } else { - result = new IOServiceClusterImpl(new IOServiceNio2WrapperImpl(), - clusterServiceFactory, - executorService); - } - - return result; + return new IOServiceNio2WrapperImpl(); } @Override diff --git a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/WorkbenchServicesImpl.java b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/WorkbenchServicesImpl.java index 9e3dbb4ae1..ea1794e7a0 100644 --- a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/WorkbenchServicesImpl.java +++ b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/WorkbenchServicesImpl.java @@ -26,7 +26,7 @@ import com.thoughtworks.xstream.XStream; import org.jboss.errai.bus.server.annotations.Service; import org.kie.soup.commons.xstream.XStreamUtils; -import org.uberfire.commons.cluster.ClusterServiceFactory; +import org.uberfire.commons.cluster.ClusterParameters; import org.uberfire.io.IOService; import org.uberfire.java.nio.IOException; import org.uberfire.java.nio.file.FileVisitResult; @@ -48,14 +48,13 @@ public class WorkbenchServicesImpl implements WorkbenchServices { public static final String PERSPECTIVE_EXTENSION = ".perspective"; private final XStream xs = XStreamUtils.createTrustingXStream(); + private final ClusterParameters clusterParameters = new ClusterParameters(); + @Inject @Named("configIO") private IOService ioService; @Inject private UserServicesImpl userServices; - @Inject - @Named("clusterServiceFactory") - private ClusterServiceFactory clusterServiceFactory; @Override public void save(final String perspectiveId, @@ -210,7 +209,7 @@ public void saveDefaultEditors(final Map properties) { @Override public boolean isWorkbenchOnCluster() { - return clusterServiceFactory != null; + return clusterParameters.isAppFormerClustered(); } private Path getPathToDefaultEditors() { diff --git a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactoryProducer.java b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactoryProducer.java deleted file mode 100644 index e9c997ea84..0000000000 --- a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactoryProducer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.backend.server.cluster; - -import java.util.concurrent.atomic.AtomicBoolean; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Produces; -import javax.inject.Named; - -import org.uberfire.commons.cluster.ClusterServiceFactory; -import org.uberfire.commons.services.cdi.Startup; -import org.uberfire.commons.services.cdi.StartupType; - -@ApplicationScoped -@Startup(StartupType.BOOTSTRAP) -public class ClusterServiceFactoryProducer { - - private final ClusterServiceFactory factory; - private AtomicBoolean initialized = new AtomicBoolean(false); - - ClusterServiceFactoryProducer() { - this.factory = buildFactory(); - } - - ClusterServiceFactory buildFactory() { - return ClusterServiceFactorySetup.buildFactory(); - } - - @Produces - @Named("clusterServiceFactory") - public synchronized ClusterServiceFactory clusterServiceFactory() { - if (factory != null && !initialized.getAndSet(true)) { - factory.build(null); - } - return factory; - } -} \ No newline at end of file diff --git a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactorySetup.java b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactorySetup.java deleted file mode 100644 index 69d2a66de9..0000000000 --- a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactorySetup.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.backend.server.cluster; - -import org.uberfire.commons.cluster.ClusterServiceFactory; - -public final class ClusterServiceFactorySetup { - - public static ClusterServiceFactory buildFactory() { - final String clusterName = System.getProperty("org.uberfire.cluster.id", - null); - final String zkAddress = System.getProperty("org.uberfire.cluster.zk", - null); - final String localId = System.getProperty("org.uberfire.cluster.local.id", - null); - final String resourceName = System.getProperty("org.uberfire.cluster.vfs.lock", - null); - final boolean autostart = Boolean.parseBoolean(System.getProperty("org.uberfire.cluster.autostart", - "true")); - - if (clusterName == null || zkAddress == null || localId == null || resourceName == null) { - return null; - } - - return new ClusterServiceFactorySimpleImpl(clusterName, - zkAddress, - localId, - resourceName, - autostart); - } -} diff --git a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactorySimpleImpl.java b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactorySimpleImpl.java deleted file mode 100644 index 489bc4b989..0000000000 --- a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/cluster/ClusterServiceFactorySimpleImpl.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.backend.server.cluster; - -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.cluster.ClusterServiceFactory; -import org.uberfire.commons.message.MessageHandlerResolver; -import org.uberfire.io.impl.cluster.helix.ClusterServiceHelix; - -public class ClusterServiceFactorySimpleImpl implements ClusterServiceFactory { - - private final String clusterName; - private final String zkAddress; - private final String localId; - private final String resourceName; - private final boolean autostart; - private ClusterService clusterService; - - public ClusterServiceFactorySimpleImpl(final String clusterName, - final String zkAddress, - final String localId, - final String resourceName, - final boolean autostart) { - this.clusterName = clusterName; - this.zkAddress = zkAddress; - this.localId = localId; - this.resourceName = resourceName; - this.autostart = autostart; - } - - @Override - public synchronized ClusterService build(final MessageHandlerResolver resolver) { - if (clusterService == null) { - clusterService = new ClusterServiceHelix(clusterName, - zkAddress, - localId, - resourceName, - resolver); - } else { - clusterService.addMessageHandlerResolver(resolver); - } - return clusterService; - } - - @Override - public boolean isAutoStart() { - return autostart; - } -} diff --git a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/ConfigIOServiceProducer.java b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/ConfigIOServiceProducer.java index 14712d9d24..10106e6be7 100644 --- a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/ConfigIOServiceProducer.java +++ b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/ConfigIOServiceProducer.java @@ -26,14 +26,12 @@ import org.jboss.errai.security.shared.service.AuthenticationService; import org.uberfire.backend.server.security.IOSecurityAuth; -import org.uberfire.commons.cluster.ClusterServiceFactory; import org.uberfire.commons.concurrent.Unmanaged; import org.uberfire.commons.lifecycle.PriorityDisposableRegistry; import org.uberfire.commons.services.cdi.Startup; import org.uberfire.commons.services.cdi.StartupType; import org.uberfire.io.IOService; import org.uberfire.io.impl.IOServiceNio2WrapperImpl; -import org.uberfire.io.impl.cluster.IOServiceClusterImpl; import org.uberfire.java.nio.file.FileSystem; @ApplicationScoped @@ -42,8 +40,6 @@ public class ConfigIOServiceProducer { private static ConfigIOServiceProducer instance; - private ClusterServiceFactory clusterServiceFactory; - private ExecutorService executorService; private Instance applicationProvidedConfigIOAuthService; @@ -62,10 +58,8 @@ public ConfigIOServiceProducer() { } @Inject - public ConfigIOServiceProducer(@Named("clusterServiceFactory") ClusterServiceFactory clusterServiceFactory, - @Unmanaged ExecutorService executorService, + public ConfigIOServiceProducer(@Unmanaged ExecutorService executorService, @IOSecurityAuth Instance applicationProvidedConfigIOAuthService) { - this.clusterServiceFactory = clusterServiceFactory; this.executorService = executorService; this.applicationProvidedConfigIOAuthService = applicationProvidedConfigIOAuthService; } @@ -73,14 +67,7 @@ public ConfigIOServiceProducer(@Named("clusterServiceFactory") ClusterServiceFac @PostConstruct public void setup() { instance = this; - if (clusterServiceFactory == null) { - configIOService = new IOServiceNio2WrapperImpl("config"); - } else { - configIOService = new IOServiceClusterImpl(new IOServiceNio2WrapperImpl("config"), - clusterServiceFactory, - clusterServiceFactory.isAutoStart(), - executorService); - } + configIOService = new IOServiceNio2WrapperImpl("config"); configFileSystem = (FileSystem) PriorityDisposableRegistry.get("systemFS"); } diff --git a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/DisposableShutdownService.java b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/DisposableShutdownService.java index 3cda3fc4db..b8ff46ca9c 100644 --- a/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/DisposableShutdownService.java +++ b/uberfire-backend/uberfire-backend-server/src/main/java/org/uberfire/backend/server/io/DisposableShutdownService.java @@ -25,7 +25,6 @@ import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; -import org.uberfire.commons.cluster.ClusterService; import org.uberfire.commons.concurrent.Managed; import org.uberfire.commons.concurrent.Unmanaged; import org.uberfire.commons.lifecycle.Disposable; @@ -55,19 +54,8 @@ public void contextInitialized(final ServletContextEvent sce) { @Override public void contextDestroyed(final ServletContextEvent sce) { - ClusterService clusterService = null; final List disposables = new ArrayList(PriorityDisposableRegistry.getDisposables()); - for (final PriorityDisposable disposable : disposables) { - if (disposable instanceof ClusterService) { - clusterService = (ClusterService) disposable; - } - } - - if (clusterService != null) { - disposables.remove(clusterService); - clusterService.lock(); - } sort(disposables); @@ -84,11 +72,6 @@ public void contextDestroyed(final ServletContextEvent sce) { } } - if (clusterService != null) { - clusterService.unlock(); - clusterService.dispose(); - } - PriorityDisposableRegistry.clear(); } diff --git a/uberfire-backend/uberfire-backend-server/src/test/java/org/uberfire/backend/server/cluster/ClusterServiceFactoryProducerTest.java b/uberfire-backend/uberfire-backend-server/src/test/java/org/uberfire/backend/server/cluster/ClusterServiceFactoryProducerTest.java deleted file mode 100644 index e3c14065de..0000000000 --- a/uberfire-backend/uberfire-backend-server/src/test/java/org/uberfire/backend/server/cluster/ClusterServiceFactoryProducerTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.backend.server.cluster; - -import org.junit.Test; -import org.uberfire.commons.cluster.ClusterServiceFactory; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class ClusterServiceFactoryProducerTest { - - @Test - public void testClusterNotAvailable() { - final ClusterServiceFactoryProducer factoryProducer = new ClusterServiceFactoryProducer() { - ClusterServiceFactory buildFactory() { - return null; - } - }; - - assertNull(factoryProducer.clusterServiceFactory()); - } - - @Test - public void testClusterInitializedBeforeAnyUse() { - final ClusterServiceFactory clusterServiceFactory = mock(ClusterServiceFactory.class); - - final ClusterServiceFactoryProducer factoryProducer = new ClusterServiceFactoryProducer() { - ClusterServiceFactory buildFactory() { - return clusterServiceFactory; - } - }; - - final ClusterServiceFactory factory = factoryProducer.clusterServiceFactory(); - assertNotNull(factory); - assertEquals(clusterServiceFactory, - factory); - - verify(factory, - times(1)).build(null); - } -} diff --git a/uberfire-backend/uberfire-backend-server/src/test/java/org/uberfire/backend/server/io/DisposableShutdownServiceTest.java b/uberfire-backend/uberfire-backend-server/src/test/java/org/uberfire/backend/server/io/DisposableShutdownServiceTest.java deleted file mode 100644 index ad1ac39820..0000000000 --- a/uberfire-backend/uberfire-backend-server/src/test/java/org/uberfire/backend/server/io/DisposableShutdownServiceTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.backend.server.io; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.ExecutorService; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.lifecycle.PriorityDisposable; -import org.uberfire.commons.lifecycle.PriorityDisposableRegistry; -import org.uberfire.java.nio.file.api.FileSystemProviders; -import org.uberfire.java.nio.file.spi.FileSystemProvider; -import org.uberfire.java.nio.fs.jgit.JGitFileSystemProvider; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; -import static org.powermock.api.mockito.PowerMockito.mockStatic; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({PriorityDisposableRegistry.class, - FileSystemProviders.class}) -public class DisposableShutdownServiceTest { - - private ExecutorService executorService; - private ExecutorService unmanagedExecutorService; - private DisposableShutdownService disposableShutdownService; - - @Before - public void setUp() { - - executorService = mock(ExecutorService.class); - unmanagedExecutorService = mock(ExecutorService.class); - - disposableShutdownService = new DisposableShutdownService(this.executorService,this.unmanagedExecutorService); - - mockStatic(PriorityDisposableRegistry.class); - mockStatic(FileSystemProviders.class); - } - - @Test - public void testGeneralStatic() { - - final JGitFileSystemProvider disposableProvider = mock(JGitFileSystemProvider.class); - - when(FileSystemProviders.installedProviders()).thenReturn(Arrays.asList(mock(FileSystemProvider.class), - disposableProvider)); - - disposableShutdownService.contextDestroyed(null); - - verify(disposableProvider, - times(1)).dispose(); - - verify(executorService, - times(1)).shutdown(); - - verify(unmanagedExecutorService, - times(1)).shutdown(); - - PowerMockito.verifyStatic(); - PriorityDisposableRegistry.clear(); - } - - @Test - public void testCluster() { - - final ClusterService clusterService = mock(ClusterService.class); - - when(FileSystemProviders.installedProviders()).thenReturn(Collections.emptyList()); - when(PriorityDisposableRegistry.getDisposables()).thenReturn(Arrays.asList(mock(PriorityDisposable.class), - clusterService)); - - disposableShutdownService.contextDestroyed(null); - - verify(clusterService, - times(1)).lock(); - verify(clusterService, - times(1)).unlock(); - verify(clusterService, - times(1)).dispose(); - } - - @Test - public void testDisposables() { - final PriorityDisposable priorityDisposable1 = mock(PriorityDisposable.class); - final PriorityDisposable priorityDisposable2 = mock(PriorityDisposable.class); - final PriorityDisposable priorityDisposable3 = mock(PriorityDisposable.class); - - when(FileSystemProviders.installedProviders()).thenReturn(Collections.emptyList()); - when(PriorityDisposableRegistry.getDisposables()).thenReturn(Arrays.asList(priorityDisposable1, - priorityDisposable2, - priorityDisposable3)); - - disposableShutdownService.contextDestroyed(null); - - verify(priorityDisposable1, - times(1)).dispose(); - verify(priorityDisposable2, - times(1)).dispose(); - verify(priorityDisposable3, - times(1)).dispose(); - } - - @Test - public void testSort() { - final PriorityDisposable priorityDisposable1 = mock(PriorityDisposable.class); - Mockito.when(priorityDisposable1.priority()).thenReturn(-1); - final PriorityDisposable priorityDisposable2 = mock(PriorityDisposable.class); - Mockito.when(priorityDisposable2.priority()).thenReturn(0); - final PriorityDisposable priorityDisposable3 = mock(PriorityDisposable.class); - Mockito.when(priorityDisposable3.priority()).thenReturn(10); - final PriorityDisposable priorityDisposable4 = mock(PriorityDisposable.class); - Mockito.when(priorityDisposable4.priority()).thenReturn(11); - - final ArrayList disposables = new ArrayList(); - disposables.add(priorityDisposable3); - disposables.add(priorityDisposable2); - disposables.add(priorityDisposable4); - disposables.add(priorityDisposable1); - - assertEquals(priorityDisposable3, - disposables.get(0)); - assertEquals(priorityDisposable2, - disposables.get(1)); - assertEquals(priorityDisposable4, - disposables.get(2)); - assertEquals(priorityDisposable1, - disposables.get(3)); - - disposableShutdownService.sort(disposables); - - assertEquals(4, - disposables.size()); - assertEquals(priorityDisposable4, - disposables.get(0)); - assertEquals(priorityDisposable3, - disposables.get(1)); - assertEquals(priorityDisposable2, - disposables.get(2)); - assertEquals(priorityDisposable1, - disposables.get(3)); - } -} diff --git a/uberfire-commons/pom.xml b/uberfire-commons/pom.xml index 0f4add6916..03e1676ecd 100644 --- a/uberfire-commons/pom.xml +++ b/uberfire-commons/pom.xml @@ -50,7 +50,56 @@ org.slf4j slf4j-api - + + + org.apache.activemq + artemis-jms-client + + + org.apache.geronimo.specs + geronimo-jms_2.0_spec + + + commons-logging + commons-logging + + + io.netty + netty-all + + + + + org.jboss.spec.javax.jms + jboss-jms-api_2.0_spec + + + io.netty + netty-buffer + + + io.netty + netty-transport + + + io.netty + netty-handler + + + io.netty + netty-transport-native-epoll + ${netty-transport-native-epoll-classifier} + + + io.netty + netty-transport-native-kqueue + ${netty-transport-native-kqueue-classifier} + + + io.netty + netty-codec-http + + diff --git a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/BaseLockExecuteNotifyReleaseTemplate.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/BaseLockExecuteNotifyReleaseTemplate.java deleted file mode 100644 index 85b07e9627..0000000000 --- a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/BaseLockExecuteNotifyReleaseTemplate.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.commons.cluster; - -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RunnableFuture; - -import org.uberfire.commons.message.MessageType; - -abstract class BaseLockExecuteNotifyReleaseTemplate { - - // default timeout 30 sec - public static final int TIMEOUT = Integer.parseInt(System.getProperty("org.uberfire.cluster.timeout", - "30000")); - - public V execute(final ClusterService clusterService, - final RunnableFuture task) { - try { - clusterService.lock(); - - task.run(); - - final V result = task.get(); - - sendMessage(clusterService); - - return result; - } catch (final ExecutionException e) { - throwException(e.getCause()); - } catch (final Exception e) { - throwException(e); - } finally { - clusterService.unlock(); - } - return null; - } - - private void throwException(final Throwable e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RuntimeException(e); - } - - abstract void sendMessage(final ClusterService clusterService); - - public abstract MessageType getMessageType(); - - public abstract String getServiceId(); - - public abstract Map buildContent(); -} diff --git a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterJMSService.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterJMSService.java new file mode 100644 index 0000000000..cd4345196f --- /dev/null +++ b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterJMSService.java @@ -0,0 +1,170 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.uberfire.commons.cluster; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterJMSService { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterJMSService.class); + + private Connection connection; + private ClusterParameters clusterParameters; + private List consumerSessions = new ArrayList<>(); + + public ClusterJMSService() { + clusterParameters = loadParameters(); + } + + public void connect() { + String jmsURL = clusterParameters.getJmsURL(); + String jmsUserName = clusterParameters.getJmsUserName(); + String jmsPassword = clusterParameters.getJmsPassword(); + ConnectionFactory factory = createConnectionFactory(jmsURL, + jmsUserName, + jmsPassword); + + try { + connection = factory.createConnection(); + connection.setExceptionListener(new JMSExceptionListener()); + connection.start(); + } catch (Exception e) { + LOGGER.error("Error connecting on JMS " + e.getMessage()); + throw new RuntimeException(e); + } + } + + ActiveMQConnectionFactory createConnectionFactory(String jmsURL, + String jmsUserName, + String jmsPassword) { + return new ActiveMQConnectionFactory(jmsURL, + jmsUserName, + jmsPassword); + } + + private ClusterParameters loadParameters() { + return new ClusterParameters(); + } + + public void createConsumer(DESTINATION_TYPE type, + String destinationName, + MessageListener listener) { + try { + + Session session = createConsumerSession(); + Destination topic = createDestination(type, + destinationName, + session); + MessageConsumer messageConsumer = session.createConsumer(topic); + messageConsumer.setMessageListener(listener); + } catch (Exception e) { + LOGGER.error("Error creating JMS Watch Service: " + e.getMessage()); + } + } + + public synchronized void broadcast(DESTINATION_TYPE type, + String destinationName, + Serializable object) { + + Session session = null; + try { + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + Destination destination = createDestination(type, + destinationName, + session); + ObjectMessage objectMessage = session.createObjectMessage(object); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.send(objectMessage); + } catch (JMSException e) { + LOGGER.error("Exception on JMS broadcast: " + e.getMessage()); + } finally { + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + LOGGER.error("Exception on closing JMS session (this could trigger a leak) " + e.getMessage()); + } + } + } + } + + private Destination createDestination(DESTINATION_TYPE type, + String destinationName, + Session session) throws JMSException { + if (type.equals(DESTINATION_TYPE.QUEUE)) { + return session.createQueue(destinationName); + } + return session.createTopic(destinationName); + } + + private Session createConsumerSession() { + try { + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + consumerSessions.add(session); + return session; + } catch (JMSException e) { + LOGGER.error("Error creating session " + e.getMessage()); + throw new RuntimeException(e); + } + } + + public boolean isAppFormerClustered() { + return clusterParameters.isAppFormerClustered(); + } + + public static class JMSExceptionListener implements ExceptionListener { + + @Override + public void onException(JMSException e) { + LOGGER.error("JMSException: " + e.getMessage()); + } + } + + public void close() { + try { + for (Session s : consumerSessions) { + s.close(); + } + connection.close(); + } catch (JMSException e) { + LOGGER.error("Exception closing JMS connection and consumerSessions: " + e.getMessage()); + } + } + + public enum DESTINATION_TYPE { + TOPIC, + QUEUE + } +} diff --git a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/ClusterParameters.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterParameters.java similarity index 98% rename from uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/ClusterParameters.java rename to uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterParameters.java index d09beb0400..c5b34bf481 100644 --- a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/ClusterParameters.java +++ b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterParameters.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.uberfire.java.nio.fs.jgit.ws.cluster; +package org.uberfire.commons.cluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterService.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterService.java deleted file mode 100644 index 5ef3f27d3a..0000000000 --- a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterService.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.commons.cluster; - -import org.uberfire.commons.lifecycle.PriorityDisposable; -import org.uberfire.commons.lock.LockService; -import org.uberfire.commons.message.MessageHandlerResolver; -import org.uberfire.commons.message.MessageService; - -public interface ClusterService extends MessageService, - LockService, - PriorityDisposable { - - void addMessageHandlerResolver(final MessageHandlerResolver resolver); - - void onStart(Runnable runnable); - - int getHoldCount(); -} diff --git a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterServiceFactory.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterServiceFactory.java deleted file mode 100644 index e953cda507..0000000000 --- a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/ClusterServiceFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.commons.cluster; - -import org.uberfire.commons.message.MessageHandlerResolver; - -public interface ClusterServiceFactory { - - ClusterService build(final MessageHandlerResolver resolver); - - boolean isAutoStart(); -} diff --git a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/LockExecuteNotifyAsyncReleaseTemplate.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/LockExecuteNotifyAsyncReleaseTemplate.java deleted file mode 100644 index 2ee82f5522..0000000000 --- a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/LockExecuteNotifyAsyncReleaseTemplate.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.commons.cluster; - -public abstract class LockExecuteNotifyAsyncReleaseTemplate extends BaseLockExecuteNotifyReleaseTemplate { - - @Override - public void sendMessage(final ClusterService clusterService) { - clusterService.broadcast(getServiceId(), - getMessageType(), - buildContent()); - } -} diff --git a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/LockExecuteNotifySyncReleaseTemplate.java b/uberfire-commons/src/main/java/org/uberfire/commons/cluster/LockExecuteNotifySyncReleaseTemplate.java deleted file mode 100644 index 8d1f5165e6..0000000000 --- a/uberfire-commons/src/main/java/org/uberfire/commons/cluster/LockExecuteNotifySyncReleaseTemplate.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.commons.cluster; - -public abstract class LockExecuteNotifySyncReleaseTemplate extends BaseLockExecuteNotifyReleaseTemplate { - - @Override - public void sendMessage(final ClusterService clusterService) { - clusterService.broadcastAndWait(getServiceId(), - getMessageType(), - buildContent(), - timeOut()); - } - - public int timeOut() { - return TIMEOUT; - } -} diff --git a/uberfire-commons/src/test/java/org/uberfire/commons/cluster/ClusterJMSServiceTest.java b/uberfire-commons/src/test/java/org/uberfire/commons/cluster/ClusterJMSServiceTest.java new file mode 100644 index 0000000000..dd11618b53 --- /dev/null +++ b/uberfire-commons/src/test/java/org/uberfire/commons/cluster/ClusterJMSServiceTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.uberfire.commons.cluster; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class ClusterJMSServiceTest { + + private ClusterJMSService clusterJMSService; + private ActiveMQConnectionFactory factory; + private Connection connection; + private Session session1; + private Session session2; + + @Before + public void setup() throws JMSException { + + factory = mock(ActiveMQConnectionFactory.class); + connection = mock(Connection.class); + when(factory.createConnection()).thenReturn(connection); + session1 = mock(Session.class); + session2 = mock(Session.class); + when(connection.createSession(eq(false), + eq(Session.AUTO_ACKNOWLEDGE))) + .thenReturn(session1, session2); + clusterJMSService = new ClusterJMSService() { + @Override + ActiveMQConnectionFactory createConnectionFactory(String jmsURL, + String jmsUserName, + String jmsPassword) { + return factory; + } + }; + } + + @Test + public void connectTest() throws JMSException { + clusterJMSService.connect(); + verify(connection).setExceptionListener(any()); + verify(connection).start(); + } + + @Test + public void sessionConsumersCreatedShouldBeClosed() throws JMSException { + clusterJMSService.connect(); + + clusterJMSService.createConsumer(ClusterJMSService.DESTINATION_TYPE.TOPIC, + "doraestination", + l -> { + }); + clusterJMSService.createConsumer(ClusterJMSService.DESTINATION_TYPE.TOPIC, + "doraestination", + l -> { + }); + + clusterJMSService.close(); + verify(session1).close(); + verify(session2).close(); + verify(connection).close(); + } +} \ No newline at end of file diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessage.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessage.java new file mode 100644 index 0000000000..ff72ce810f --- /dev/null +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessage.java @@ -0,0 +1,12 @@ +package org.ext.uberfire.social.activities.persistence; + +public enum SocialClusterMessage { + NEW_EVENT, + NEW_EVENT_USER, + UPDATE_TYPE_EVENT, + UPDATE_USER_EVENT, + SOCIAL_EVENT, + SOCIAL_FILE_SYSTEM_PERSISTENCE, + CLUSTER_SHUTDOWN, + NO_TYPE +} diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessaging.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessaging.java index d09631a840..a61376f018 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessaging.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialClusterMessaging.java @@ -15,30 +15,25 @@ package org.ext.uberfire.social.activities.persistence; -import java.lang.reflect.Type; -import java.util.Collection; -import java.util.HashMap; +import java.io.Serializable; import java.util.List; -import java.util.Map; -import javax.annotation.PostConstruct; +import java.util.UUID; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.inject.Named; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import org.ext.uberfire.social.activities.model.SocialActivitiesEvent; import org.ext.uberfire.social.activities.model.SocialEventType; import org.ext.uberfire.social.activities.model.SocialUser; import org.ext.uberfire.social.activities.service.SocialEventTypeRepositoryAPI; import org.ext.uberfire.social.activities.service.SocialTimelinePersistenceAPI; import org.ext.uberfire.social.activities.service.SocialUserPersistenceAPI; -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.cluster.ClusterServiceFactory; -import org.uberfire.commons.data.Pair; -import org.uberfire.commons.message.MessageHandler; -import org.uberfire.commons.message.MessageHandlerResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.uberfire.commons.cluster.ClusterJMSService; import org.uberfire.commons.message.MessageType; import org.uberfire.commons.services.cdi.Startup; @@ -46,66 +41,58 @@ @Startup public class SocialClusterMessaging { - private Gson gson; + private static final Logger LOGGER = LoggerFactory.getLogger(SocialClusterMessaging.class); - private Type gsonCollectionType; + public static final String topicName = "SOCIAL_CLUSTER_MESSAGE"; - private String cluster = "social-service"; - - @Inject - private ClusterServiceFactory clusterServiceFactory; - - @Inject - @Named("socialTimelinePersistence") private SocialTimelinePersistenceAPI socialTimelinePersistence; @Inject private SocialEventTypeRepositoryAPI socialEventTypeRepository; - @Inject - @Named("socialUserPersistenceAPI") private SocialUserPersistenceAPI socialUserPersistenceAPI; - private ClusterService clusterService; + private ClusterJMSService clusterJMSService; - @PostConstruct - public void setup() { - gsonFactory(); - if (clusterServiceFactory != null) { - clusterService = clusterServiceFactory.build(new MessageHandlerResolver() { - @Override - public String getServiceId() { - return cluster; - } + private String nodeId = UUID.randomUUID().toString(); - @Override - public MessageHandler resolveHandler(String serviceId, - MessageType type) { - return new MessageHandler() { - @Override - public Pair> handleMessage(MessageType type, - Map content) { - if (type != null) { - String strType = type.toString(); - if (strType.equals(SocialClusterMessage.SOCIAL_EVENT.name())) { - handleSocialEvent(content); - } - if (strType.equals(SocialClusterMessage.SOCIAL_FILE_SYSTEM_PERSISTENCE.name())) { - handleSocialPersistenceEvent(content); - } - if (strType.equals(SocialClusterMessage.CLUSTER_SHUTDOWN.name())) { - handleClusterShutdown(); - } - } - - return new Pair>(type, - content); + private void topicMessageListener(Message message) { + if (message instanceof ObjectMessage) { + try { + Serializable object = ((ObjectMessage) message).getObject(); + if (object instanceof SocialMessageWrapper) { + SocialMessageWrapper messageWrapper = (SocialMessageWrapper) object; + if (!messageWrapper.getNodeId().equals(nodeId)) { + SocialClusterMessage strType = messageWrapper.getMessageType(); + if (strType.equals(SocialClusterMessage.SOCIAL_EVENT.name())) { + handleSocialEvent(messageWrapper); + } + if (strType.equals(SocialClusterMessage.SOCIAL_FILE_SYSTEM_PERSISTENCE.name())) { + handleSocialPersistenceEvent(messageWrapper); + } + if (strType.equals(SocialClusterMessage.CLUSTER_SHUTDOWN.name())) { + handleClusterShutdown(); } - }; + } } - }); - } else { - clusterService = null; + } catch (JMSException e) { + LOGGER.error("Exception receiving JMS message: " + e.getMessage()); + } + } + } + + public void setup(ClusterJMSService clusterJMSService, + SocialTimelinePersistenceAPI socialTimelinePersistenceAPI, + SocialUserPersistenceAPI socialUserPersistenceAPI) { + this.clusterJMSService = clusterJMSService; + this.socialTimelinePersistence = socialTimelinePersistenceAPI; + this.socialUserPersistenceAPI = socialUserPersistenceAPI; + + if (this.clusterJMSService.isAppFormerClustered()) { + this.clusterJMSService.connect(); + this.clusterJMSService.createConsumer(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + message -> topicMessageListener(message)); } } @@ -114,19 +101,15 @@ private void handleClusterShutdown() { cacheClusterPersistence.someNodeShutdownAndPersistEvents(); } - private void handleSocialPersistenceEvent(Map content) { + private void handleSocialPersistenceEvent(SocialMessageWrapper message) { SocialActivitiesEvent eventTypeName = null; SocialUser user = null; SocialTimelineCacheClusterPersistence cacheClusterPersistence = (SocialTimelineCacheClusterPersistence) socialTimelinePersistence; - for (final Map.Entry entry : content.entrySet()) { - if (entry.getKey().equalsIgnoreCase(SocialClusterMessage.UPDATE_TYPE_EVENT.name())) { - eventTypeName = gson.fromJson(entry.getValue(), - SocialActivitiesEvent.class); - } - if (entry.getKey().equalsIgnoreCase(SocialClusterMessage.UPDATE_USER_EVENT.name())) { - user = gson.fromJson(entry.getValue(), - SocialUser.class); - } + if (message.getSubMessageType().equals(SocialClusterMessage.UPDATE_TYPE_EVENT)) { + eventTypeName = message.getEvent(); + } + if (message.getSubMessageType().equals(SocialClusterMessage.UPDATE_USER_EVENT)) { + user = message.getUser(); } if (user == null || user.getUserName() == null) { SocialEventType typeEvent = socialEventTypeRepository.findType(eventTypeName.getType()); @@ -139,19 +122,9 @@ private void handleSocialPersistenceEvent(Map content) { } } - private void handleSocialEvent(Map content) { - SocialActivitiesEvent event = null; - SocialUser user = null; - for (final Map.Entry entry : content.entrySet()) { - if (entry.getKey().equalsIgnoreCase(SocialClusterMessage.NEW_EVENT.name())) { - event = gson.fromJson(entry.getValue(), - SocialActivitiesEvent.class); - } - if (entry.getKey().equalsIgnoreCase(SocialClusterMessage.NEW_EVENT_USER.name())) { - user = gson.fromJson(entry.getValue(), - SocialUser.class); - } - } + private void handleSocialEvent(SocialMessageWrapper message) { + SocialActivitiesEvent event = message.getEvent(); + SocialUser user = message.getUser(); if (event != null) { SocialEventType typeEvent = socialEventTypeRepository.findType(event.getType()); SocialTimelineCacheClusterPersistence cacheClusterPersistence = (SocialTimelineCacheClusterPersistence) socialTimelinePersistence; @@ -171,83 +144,53 @@ private void handleSocialEvent(Map content) { } } - void gsonFactory() { - GsonBuilder gsonBuilder = new GsonBuilder(); - gson = gsonBuilder.create(); - - gsonCollectionType = new TypeToken>() { - }.getType(); - } - public void notify(SocialActivitiesEvent event) { - if (clusterService == null) { + if (!clusterJMSService.isAppFormerClustered()) { return; } - String eventJson = gson.toJson(event); - String userJson = gson.toJson(event.getSocialUser()); - Map content = new HashMap(); - content.put(SocialClusterMessage.NEW_EVENT.name(), - eventJson); - content.put(SocialClusterMessage.NEW_EVENT_USER.name(), - userJson); - clusterService.broadcast(cluster, - SocialClusterMessage.SOCIAL_EVENT, - content); + clusterJMSService.broadcast(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + new SocialMessageWrapper(nodeId, + SocialClusterMessage.SOCIAL_EVENT, + event, + event.getSocialUser())); } public void notifyTimeLineUpdate(SocialActivitiesEvent event) { - if (clusterService == null) { + if (!clusterJMSService.isAppFormerClustered()) { return; } - Map content = new HashMap(); - String json = gson.toJson(event); - content.put(SocialClusterMessage.UPDATE_TYPE_EVENT.name(), - json); - - clusterService.broadcast(cluster, - SocialClusterMessage.SOCIAL_FILE_SYSTEM_PERSISTENCE, - content); + clusterJMSService.broadcast(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + new SocialMessageWrapper(nodeId, + SocialClusterMessage.SOCIAL_FILE_SYSTEM_PERSISTENCE, + event, + null, + SocialClusterMessage.UPDATE_TYPE_EVENT)); } public void notifyTimeLineUpdate(SocialUser user, List storedEvents) { - if (clusterService == null) { + if (!clusterJMSService.isAppFormerClustered()) { return; } - Map content = new HashMap(); - String json = gson.toJson(user); - content.put(SocialClusterMessage.UPDATE_USER_EVENT.name(), - json); - clusterService.broadcast(cluster, - SocialClusterMessage.SOCIAL_FILE_SYSTEM_PERSISTENCE, - content); + clusterJMSService.broadcast(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + new SocialMessageWrapper(nodeId, + SocialClusterMessage.SOCIAL_FILE_SYSTEM_PERSISTENCE, + null, + user, + SocialClusterMessage.UPDATE_USER_EVENT)); } - public void notifySomeInstanceisOnShutdown() { - if (clusterService == null) { + public void notifySomeInstanceIsOnShutdown() { + if (!clusterJMSService.isAppFormerClustered()) { return; } - clusterService.broadcast(cluster, - SocialClusterMessage.CLUSTER_SHUTDOWN, - new HashMap()); - } - - public void lockFileSystem() { - clusterService.lock(); + clusterJMSService.broadcast(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + new SocialMessageWrapper(nodeId, + SocialClusterMessage.CLUSTER_SHUTDOWN)); } - public void unlockFileSystem() { - clusterService.unlock(); - } - - private enum SocialClusterMessage implements MessageType { - NEW_EVENT, - NEW_EVENT_USER, - UPDATE_TYPE_EVENT, - UPDATE_USER_EVENT, - SOCIAL_EVENT, - SOCIAL_FILE_SYSTEM_PERSISTENCE, - CLUSTER_SHUTDOWN; - - } } diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialMessageWrapper.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialMessageWrapper.java new file mode 100644 index 0000000000..01918d538f --- /dev/null +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialMessageWrapper.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.ext.uberfire.social.activities.persistence; + +import java.io.Serializable; + +import org.ext.uberfire.social.activities.model.SocialActivitiesEvent; +import org.ext.uberfire.social.activities.model.SocialUser; + +public class SocialMessageWrapper implements Serializable { + + private final String nodeId; + private SocialClusterMessage messageType; + private SocialActivitiesEvent event; + private SocialUser user; + private SocialClusterMessage subMessageType = SocialClusterMessage.NO_TYPE; + + public SocialMessageWrapper(String nodeId, + SocialClusterMessage messageType, + SocialActivitiesEvent event, + SocialUser user) { + this.nodeId = nodeId; + this.messageType = messageType; + this.event = event; + this.user = user; + } + + public SocialMessageWrapper(String nodeId, + SocialClusterMessage messageType, + SocialActivitiesEvent event, + SocialUser user, + SocialClusterMessage subMessageType) { + this.nodeId = nodeId; + this.messageType = messageType; + this.event = event; + this.user = user; + this.subMessageType = subMessageType; + } + + public SocialMessageWrapper(String nodeId, + SocialClusterMessage messageType) { + this.nodeId = nodeId; + this.messageType = messageType; + } + + public String getNodeId() { + return nodeId; + } + + public SocialUser getUser() { + return user; + } + + public SocialActivitiesEvent getEvent() { + return event; + } + + public SocialClusterMessage getMessageType() { + return messageType; + } + + public SocialClusterMessage getSubMessageType() { + return subMessageType; + } +} diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialTimelineCacheClusterPersistence.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialTimelineCacheClusterPersistence.java index 8ffb9f9c76..1c3c121c4c 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialTimelineCacheClusterPersistence.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialTimelineCacheClusterPersistence.java @@ -85,7 +85,7 @@ public void saveAllEvents() { Path timeLineDir = userServicesBackend.buildPath(SOCIAL_FILES, sampleType.name()); ioService.startBatch(timeLineDir.getFileSystem()); - socialClusterMessaging.notifySomeInstanceisOnShutdown(); + socialClusterMessaging.notifySomeInstanceIsOnShutdown(); saveAllTypeEvents(); saveAllUserTimelines(); } catch (Exception e) { diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessageWrapper.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessageWrapper.java new file mode 100644 index 0000000000..d3da0df1fa --- /dev/null +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessageWrapper.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.ext.uberfire.social.activities.persistence; + +import java.io.Serializable; + +import org.ext.uberfire.social.activities.model.SocialUser; + +public class SocialUserClusterMessageWrapper implements Serializable { + + private final String nodeId; + private final SocialUser user; + + public SocialUserClusterMessageWrapper(String nodeId, + SocialUser user) { + this.nodeId = nodeId; + this.user = user; + } + + public String getNodeId() { + return nodeId; + } + + public SocialUser getUser() { + return user; + } +} diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessaging.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessaging.java index f45d8348a5..bcf434c5bf 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessaging.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterMessaging.java @@ -15,118 +15,71 @@ package org.ext.uberfire.social.activities.persistence; -import java.lang.reflect.Type; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.PostConstruct; +import java.io.Serializable; +import java.util.UUID; import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import javax.inject.Named; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import org.ext.uberfire.social.activities.model.SocialActivitiesEvent; import org.ext.uberfire.social.activities.model.SocialUser; import org.ext.uberfire.social.activities.service.SocialUserPersistenceAPI; -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.cluster.ClusterServiceFactory; -import org.uberfire.commons.data.Pair; -import org.uberfire.commons.message.MessageHandler; -import org.uberfire.commons.message.MessageHandlerResolver; -import org.uberfire.commons.message.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.uberfire.commons.cluster.ClusterJMSService; import org.uberfire.commons.services.cdi.Startup; @ApplicationScoped @Startup public class SocialUserClusterMessaging { - private Gson gson; + private static final Logger LOGGER = LoggerFactory.getLogger(SocialUserClusterMessaging.class); - private Type gsonCollectionType; + public static final String topicName = "SOCIAL_USER_MESSAGE"; - private String cluster = "social-user"; + private SocialUserPersistenceAPI socialUserPersistenceAPI; - @Inject - @Named("clusterServiceFactory") - private ClusterServiceFactory clusterServiceFactory; + private ClusterJMSService clusterJMSService; - @Inject - @Named("socialUserPersistenceAPI") - private SocialUserPersistenceAPI socialUserCachePersistence; + private String nodeId = UUID.randomUUID().toString(); - private ClusterService clusterService; + public void setup(ClusterJMSService clusterJMSService, + SocialUserPersistenceAPI socialUserPersistenceAPI) { + this.clusterJMSService = clusterJMSService; + this.socialUserPersistenceAPI = socialUserPersistenceAPI; + if (clusterJMSService.isAppFormerClustered()) { + clusterJMSService.connect(); - @PostConstruct - public void setup() { - gsonFactory(); - - if (clusterServiceFactory != null) { - clusterService = clusterServiceFactory.build(new MessageHandlerResolver() { - @Override - public String getServiceId() { - return cluster; - } - - @Override - public MessageHandler resolveHandler(String serviceId, - MessageType type) { - return new MessageHandler() { - @Override - public Pair> handleMessage(MessageType type, - Map content) { - if (type != null) { - String strType = type.toString(); - if (strType.equals(SocialUserClusterMessage.SOCIAL_USER_UPDATE.name())) { - handleUserUpdate(content); - } - } - return new Pair>(type, - content); - } - }; - } - }); - } else { - clusterService = null; + clusterJMSService.createConsumer(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + message -> topicMessageListener(message)); } } - private void handleUserUpdate(Map content) { - for (final Map.Entry entry : content.entrySet()) { - if (entry.getKey().equalsIgnoreCase(SocialUserClusterMessage.UPDATE_USER.name())) { - SocialUser user = gson.fromJson(entry.getValue(), - SocialUser.class); - SocialUserClusterPersistence socialUserClusterPersistence = (SocialUserClusterPersistence) socialUserCachePersistence; - socialUserClusterPersistence.sync(user); + private void topicMessageListener(Message message) { + if (message instanceof ObjectMessage) { + try { + Serializable object = ((ObjectMessage) message).getObject(); + if (object instanceof SocialUserClusterMessageWrapper) { + SocialUserClusterMessageWrapper messageWrapper = (SocialUserClusterMessageWrapper) object; + if (!messageWrapper.getNodeId().equals(nodeId)) { + SocialUserClusterPersistence socialUserClusterPersistence = (SocialUserClusterPersistence) socialUserPersistenceAPI; + socialUserClusterPersistence.sync(messageWrapper.getUser()); + } + } + } catch (JMSException e) { + LOGGER.error("Exception receiving JMS message: " + e.getMessage()); } } } - void gsonFactory() { - GsonBuilder gsonBuilder = new GsonBuilder(); - gson = gsonBuilder.create(); - - gsonCollectionType = new TypeToken>() { - }.getType(); - } - public void notify(SocialUser user) { - if (clusterService == null) { + if (!clusterJMSService.isAppFormerClustered()) { return; } - Map content = new HashMap(); - String json = gson.toJson(user); - content.put(SocialUserClusterMessage.UPDATE_USER.name(), - json); - clusterService.broadcast(cluster, - SocialUserClusterMessage.SOCIAL_USER_UPDATE, - content); - } - - private enum SocialUserClusterMessage implements MessageType { - UPDATE_USER, - SOCIAL_USER_UPDATE + clusterJMSService.broadcast(ClusterJMSService.DESTINATION_TYPE.TOPIC, + topicName, + new SocialUserClusterMessageWrapper(nodeId, + user)); } } diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterPersistence.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterPersistence.java index fad75b56a9..4731d17ed5 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterPersistence.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/persistence/SocialUserClusterPersistence.java @@ -24,7 +24,7 @@ public class SocialUserClusterPersistence extends SocialUserCachePersistence { - private final SocialUserClusterMessaging socialUserClusterMessaging; + private SocialUserClusterMessaging socialUserClusterMessaging; public SocialUserClusterPersistence(SocialUserServicesExtendedBackEndImpl userServicesBackend, UserServicesImpl userServices, diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialConfiguration.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialConfiguration.java index bf473b0a02..759a6f4bd6 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialConfiguration.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialConfiguration.java @@ -18,18 +18,10 @@ import javax.annotation.PostConstruct; import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import javax.inject.Named; - -import org.uberfire.commons.cluster.ClusterServiceFactory; @ApplicationScoped public class SocialConfiguration { - @Inject - @Named("clusterServiceFactory") - private ClusterServiceFactory clusterServiceFactory; - private Boolean socialEnable; @PostConstruct diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialTimelinePersistenceProducer.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialTimelinePersistenceProducer.java index 79a6186a2a..56aaf729f7 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialTimelinePersistenceProducer.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialTimelinePersistenceProducer.java @@ -36,7 +36,7 @@ import org.ext.uberfire.social.activities.service.SocialTimelinePersistenceAPI; import org.ext.uberfire.social.activities.service.SocialUserPersistenceAPI; import org.uberfire.backend.server.io.ConfigIOServiceProducer; -import org.uberfire.commons.cluster.ClusterServiceFactory; +import org.uberfire.commons.cluster.ClusterJMSService; import org.uberfire.commons.services.cdi.Startup; import org.uberfire.commons.services.cdi.StartupType; import org.uberfire.io.IOService; @@ -50,9 +50,6 @@ public class SocialTimelinePersistenceProducer { SocialEventTypeRepositoryAPI socialEventTypeRepository; @Inject SocialSecurityConstraintsManager socialSecurityConstraintsManager; - @Inject - @Named("clusterServiceFactory") - private ClusterServiceFactory clusterServiceFactory; private SocialTimelinePersistenceAPI socialTimelinePersistenceAPI; private Gson gson; private Type gsonCollectionType; @@ -73,9 +70,12 @@ public class SocialTimelinePersistenceProducer { @Inject private SocialUserPersistenceAPI socialUserPersistenceAPI; + private ClusterJMSService clusterJMSService; + @PostConstruct public void setup() { gsonFactory(); + clusterJMSService = new ClusterJMSService(); final IOService _ioService = getConfigIOServiceProducer().configIOService(); final FileSystem _fileSystem = getConfigIOServiceProducer().configFileSystem(); final SocialUserServicesExtendedBackEndImpl userServicesBackend = new SocialUserServicesExtendedBackEndImpl(fileSystem); @@ -92,7 +92,7 @@ ConfigIOServiceProducer getConfigIOServiceProducer() { void setupSocialTimelinePersistenceAPI(IOService _ioService, FileSystem _fileSystem, SocialUserServicesExtendedBackEndImpl userServicesBackend) { - if (clusterServiceFactory == null) { + if (!clusterJMSService.isAppFormerClustered()) { socialTimelinePersistenceAPI = new SocialTimelineCacheInstancePersistence(gson, gsonCollectionType, _ioService, @@ -111,6 +111,7 @@ void setupSocialTimelinePersistenceAPI(IOService _ioService, userServicesBackend, _fileSystem, socialSecurityConstraintsManager); + socialClusterMessaging.setup(clusterJMSService, socialTimelinePersistenceAPI, socialUserPersistenceAPI); } socialTimelinePersistenceAPI.setup(); } diff --git a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialUserPersistenceProducer.java b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialUserPersistenceProducer.java index c4b70dd45d..f32858e43d 100644 --- a/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialUserPersistenceProducer.java +++ b/uberfire-extensions/uberfire-social-activities/uberfire-social-activities-backend/src/main/java/org/ext/uberfire/social/activities/server/SocialUserPersistenceProducer.java @@ -35,7 +35,7 @@ import org.ext.uberfire.social.activities.service.SocialUserPersistenceAPI; import org.uberfire.backend.server.UserServicesImpl; import org.uberfire.backend.server.io.ConfigIOServiceProducer; -import org.uberfire.commons.cluster.ClusterServiceFactory; +import org.uberfire.commons.cluster.ClusterJMSService; import org.uberfire.commons.services.cdi.Startup; import org.uberfire.commons.services.cdi.StartupType; import org.uberfire.io.IOService; @@ -45,10 +45,6 @@ @Startup(StartupType.BOOTSTRAP) public class SocialUserPersistenceProducer { - @Inject - @Named("clusterServiceFactory") - private ClusterServiceFactory clusterServiceFactory; - private Gson gson; private Type gsonCollectionType; @@ -78,9 +74,12 @@ public class SocialUserPersistenceProducer { @Inject private SocialUserClusterMessaging socialUserClusterMessaging; + private ClusterJMSService clusterJMSService; + @PostConstruct public void setup() { gsonFactory(); + clusterJMSService = new ClusterJMSService(); final IOService _ioService = getConfigIOServiceProducer().configIOService(); final SocialUserServicesExtendedBackEndImpl userServicesBackend = new SocialUserServicesExtendedBackEndImpl(fileSystem); @@ -90,7 +89,7 @@ public void setup() { void setupSocialUserPersistenceAPI(IOService _ioService, SocialUserServicesExtendedBackEndImpl userServicesBackend) { - if (clusterServiceFactory == null) { + if (!clusterJMSService.isAppFormerClustered()) { socialUserPersistenceAPI = new SocialUserInstancePersistence(userServicesBackend, userServices, _ioService, @@ -101,6 +100,7 @@ void setupSocialUserPersistenceAPI(IOService _ioService, _ioService, gson, socialUserClusterMessaging); + socialUserClusterMessaging.setup(clusterJMSService, socialUserPersistenceAPI); } socialUserPersistenceAPI.setup(); } diff --git a/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ApplicationScopedProducer.java b/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ApplicationScopedProducer.java index 65447f3abd..7336f60939 100644 --- a/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ApplicationScopedProducer.java +++ b/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ApplicationScopedProducer.java @@ -27,11 +27,9 @@ import org.jboss.errai.security.shared.api.identity.User; import org.jboss.errai.security.shared.service.AuthenticationService; import org.uberfire.backend.server.IOWatchServiceNonDotImpl; -import org.uberfire.commons.cluster.ClusterServiceFactory; import org.uberfire.commons.concurrent.Unmanaged; import org.uberfire.io.IOService; import org.uberfire.io.impl.IOServiceDotFileImpl; -import org.uberfire.io.impl.cluster.IOServiceClusterImpl; @ApplicationScoped public class ApplicationScopedProducer { @@ -40,8 +38,6 @@ public class ApplicationScopedProducer { private AuthenticationService authenticationService; - private ClusterServiceFactory clusterServiceFactory; - private IOService ioService; private ExecutorService executorService; @@ -52,23 +48,15 @@ public ApplicationScopedProducer() { @Inject public ApplicationScopedProducer(IOWatchServiceNonDotImpl watchService, AuthenticationService authenticationService, - @Named("clusterServiceFactory") ClusterServiceFactory clusterServiceFactory, @Unmanaged ExecutorService executorService) { this.watchService = watchService; this.authenticationService = authenticationService; - this.clusterServiceFactory = clusterServiceFactory; this.executorService = executorService; } @PostConstruct public void setup() { - if (clusterServiceFactory == null) { - ioService = new IOServiceDotFileImpl(watchService); - } else { - ioService = new IOServiceClusterImpl(new IOServiceDotFileImpl(watchService), - clusterServiceFactory, - executorService); - } + ioService = new IOServiceDotFileImpl(watchService); } @Produces diff --git a/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ShowcaseSocialUserEventAdapter.java b/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ShowcaseSocialUserEventAdapter.java index 4adada8ffa..580c3f08fd 100644 --- a/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ShowcaseSocialUserEventAdapter.java +++ b/uberfire-extensions/uberfire-wires/uberfire-wires-webapp/src/main/java/org/uberfire/ext/wires/backend/server/impl/ShowcaseSocialUserEventAdapter.java @@ -31,6 +31,7 @@ import org.ext.uberfire.social.activities.service.SocialAdapter; import org.ext.uberfire.social.activities.service.SocialCommandTypeFilter; import org.ext.uberfire.social.activities.service.SocialUserRepositoryAPI; +import org.jboss.errai.common.client.api.annotations.Portable; import org.uberfire.ext.wires.shared.social.ShowcaseSocialUserEvent; @ApplicationScoped @@ -48,7 +49,7 @@ public Class eventToIntercept() { @Override public SocialEventType socialEventType() { - return DefaultTypes.DUMMY_EVENT; + return SampleType.SAMPLE; } @Override @@ -73,7 +74,7 @@ public SocialActivitiesEvent toSocial(Object object) { final String desc = String.format("new social event (%d)", counter.incrementAndGet()); return new SocialActivitiesEvent(socialUser, - DefaultTypes.DUMMY_EVENT, + SampleType.SAMPLE, new Date()) .withAdicionalInfo("edited") .withDescription(desc) @@ -94,4 +95,10 @@ public List getTimelineFilters() { public List getTimelineFiltersNames() { return new ArrayList(); } + + @Portable + public enum SampleType implements SocialEventType { + + SAMPLE, + } } \ No newline at end of file diff --git a/uberfire-io/pom.xml b/uberfire-io/pom.xml index 20c4c21415..b73035a70f 100644 --- a/uberfire-io/pom.xml +++ b/uberfire-io/pom.xml @@ -51,17 +51,6 @@ org.uberfire uberfire-nio2-api - - org.apache.helix - helix-core - - - log4j - log4j - - - - org.slf4j log4j-over-slf4j diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/ClusterMessageType.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/ClusterMessageType.java deleted file mode 100644 index 4d291e05fc..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/ClusterMessageType.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster; - -import org.uberfire.commons.message.MessageType; - -public enum ClusterMessageType implements MessageType { - NEW_FS, - SYNC_FS, - QUERY_FOR_FS, - QUERY_FOR_FS_RESULT; -} diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/FileSystemSyncLock.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/FileSystemSyncLock.java deleted file mode 100644 index 1b01bc3582..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/FileSystemSyncLock.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster; - -import java.util.HashMap; -import java.util.Map; - -import org.uberfire.commons.cluster.LockExecuteNotifyAsyncReleaseTemplate; -import org.uberfire.commons.message.MessageType; -import org.uberfire.java.nio.file.FileSystemMetadata; - -import static org.uberfire.io.impl.cluster.ClusterMessageType.SYNC_FS; - -public class FileSystemSyncLock extends LockExecuteNotifyAsyncReleaseTemplate { - - private final String serviceId; - private final String scheme; - private final String id; - private final String uri; - - public FileSystemSyncLock(final String serviceId, - final FileSystemMetadata fileSystemMetadata) { - this.serviceId = serviceId; - this.scheme = fileSystemMetadata.getScheme(); - this.id = fileSystemMetadata.getId(); - this.uri = fileSystemMetadata.getUri(); - } - - @Override - public MessageType getMessageType() { - return SYNC_FS; - } - - @Override - public String getServiceId() { - return serviceId; - } - - @Override - public Map buildContent() { - return new HashMap() {{ - put("fs_scheme", - scheme); - put("fs_id", - id); - put("fs_uri", - uri); - }}; - } -} diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/FileSystemSyncNonLock.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/FileSystemSyncNonLock.java deleted file mode 100644 index a930ea4cf1..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/FileSystemSyncNonLock.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RunnableFuture; - -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.message.MessageType; -import org.uberfire.java.nio.file.FileSystemMetadata; - -import static org.uberfire.io.impl.cluster.ClusterMessageType.SYNC_FS; - -public class FileSystemSyncNonLock { - - private final String serviceId; - private final String scheme; - private final String id; - private final String uri; - - public FileSystemSyncNonLock(final String serviceId, - final FileSystemMetadata fsInfo) { - this.serviceId = serviceId; - - this.scheme = fsInfo.getScheme(); - this.id = fsInfo.getId(); - this.uri = fsInfo.getUri(); - } - - public MessageType getMessageType() { - return SYNC_FS; - } - - public String getServiceId() { - return serviceId; - } - - public Map buildContent() { - return new HashMap() {{ - put("fs_scheme", - scheme); - put("fs_id", - id); - put("fs_uri", - uri); - }}; - } - - public void sendMessage(final ClusterService clusterService) { - clusterService.broadcast(getServiceId(), - getMessageType(), - buildContent()); - } - - public V execute(final ClusterService clusterService, - final RunnableFuture task) { - try { - task.run(); - - final V result = task.get(); - - sendMessage(clusterService); - - return result; - } catch (final ExecutionException e) { - throwException(e.getCause()); - } catch (final Exception e) { - throwException(e); - } - return null; - } - - private void throwException(final Throwable e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RuntimeException(e); - } -} diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/IOServiceClusterImpl.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/IOServiceClusterImpl.java deleted file mode 100644 index 5451377235..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/IOServiceClusterImpl.java +++ /dev/null @@ -1,1339 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URLEncoder; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.FutureTask; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.uberfire.commons.async.DescriptiveRunnable; -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.cluster.ClusterServiceFactory; -import org.uberfire.commons.cluster.LockExecuteNotifySyncReleaseTemplate; -import org.uberfire.commons.data.Pair; -import org.uberfire.commons.lifecycle.PriorityDisposableRegistry; -import org.uberfire.commons.lock.LockExecuteReleaseTemplate; -import org.uberfire.commons.message.AsyncCallback; -import org.uberfire.commons.message.MessageHandler; -import org.uberfire.commons.message.MessageHandlerResolver; -import org.uberfire.commons.message.MessageType; -import org.uberfire.io.IOService; -import org.uberfire.io.impl.IOServiceLockable; -import org.uberfire.java.nio.IOException; -import org.uberfire.java.nio.base.FileSystemId; -import org.uberfire.java.nio.base.FileSystemState; -import org.uberfire.java.nio.base.FileSystemStateAware; -import org.uberfire.java.nio.base.SeekableByteChannelWrapperImpl; -import org.uberfire.java.nio.channels.SeekableByteChannel; -import org.uberfire.java.nio.file.AtomicMoveNotSupportedException; -import org.uberfire.java.nio.file.CopyOption; -import org.uberfire.java.nio.file.DeleteOption; -import org.uberfire.java.nio.file.DirectoryNotEmptyException; -import org.uberfire.java.nio.file.DirectoryStream; -import org.uberfire.java.nio.file.FileAlreadyExistsException; -import org.uberfire.java.nio.file.FileSystem; -import org.uberfire.java.nio.file.FileSystemAlreadyExistsException; -import org.uberfire.java.nio.file.FileSystemMetadata; -import org.uberfire.java.nio.file.FileSystemNotFoundException; -import org.uberfire.java.nio.file.NoSuchFileException; -import org.uberfire.java.nio.file.NotDirectoryException; -import org.uberfire.java.nio.file.OpenOption; -import org.uberfire.java.nio.file.Option; -import org.uberfire.java.nio.file.Path; -import org.uberfire.java.nio.file.ProviderNotFoundException; -import org.uberfire.java.nio.file.attribute.FileAttribute; -import org.uberfire.java.nio.file.attribute.FileAttributeView; -import org.uberfire.java.nio.file.attribute.FileTime; - -import static org.kie.soup.commons.validation.PortablePreconditions.checkNotNull; -import static org.kie.soup.commons.validation.Preconditions.checkInstanceOf; -import static org.uberfire.io.impl.cluster.ClusterMessageType.NEW_FS; -import static org.uberfire.io.impl.cluster.ClusterMessageType.QUERY_FOR_FS; -import static org.uberfire.io.impl.cluster.ClusterMessageType.QUERY_FOR_FS_RESULT; -import static org.uberfire.io.impl.cluster.ClusterMessageType.SYNC_FS; - -public class IOServiceClusterImpl implements IOService { - - private static final Logger logger = LoggerFactory.getLogger(IOServiceClusterImpl.class); - protected final Set batchFileSystems = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private ExecutorService executorService; - protected IOServiceLockable service; - protected ClusterService clusterService; - private NewFileSystemListener newFileSystemListener = null; - - IOServiceClusterImpl() { - } - - public IOServiceClusterImpl(final IOService service, - final ClusterServiceFactory clusterServiceFactory, - final ExecutorService executorService) { - this(service, - clusterServiceFactory, - true, - executorService); - } - - public IOServiceClusterImpl(final IOService service, - final ClusterServiceFactory clusterServiceFactory, - final boolean autoStart, - final ExecutorService executorService) { - checkNotNull("clusterServiceFactory", - clusterServiceFactory); - this.service = checkInstanceOf("service", - service, - IOServiceLockable.class); - - logger.debug("Creating instance of cluster service with auto start {}", - autoStart); - - this.executorService = executorService; - - this.clusterService = clusterServiceFactory.build(new MessageHandlerResolver() { - - final MessageHandler newFs = new NewFileSystemMessageHandler(); - final MessageHandler syncFs = new SyncFileSystemMessageHandler(); - final MessageHandler queryFs = new QueryFileSystemMessageHandler(); - - @Override - public String getServiceId() { - return IOServiceClusterImpl.this.service.getId(); - } - - @Override - public MessageHandler resolveHandler(final String serviceId, - final MessageType type) { - - if (serviceId.equals(IOServiceClusterImpl.this.service.getId())) { - if (NEW_FS.equals(type)) { - return newFs; - } else if (SYNC_FS.equals(type)) { - return syncFs; - } else if (QUERY_FOR_FS.equals(type)) { - return queryFs; - } - } - - return null; - } - }); - - PriorityDisposableRegistry.register(this); - - start(); - } - - private void start() { - logger.debug("Starting IO Cluster service {}", - this); - //New cluster members are executed within locked - new LockExecuteReleaseTemplate().execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - - // 10 seconds - int timeout = 10000; - final AtomicBoolean msgAnsweredOrTimedout = new AtomicBoolean(false); - final AtomicBoolean onSync = new AtomicBoolean(false); - - final Map fileSystems = new HashMap(); - - clusterService.broadcastAndWait(service.getId(), - QUERY_FOR_FS, - Collections.emptyMap(), - timeout, - new AsyncCallback() { - @Override - public void onTimeOut() { - msgAnsweredOrTimedout.set(true); - } - - @Override - public void onReply(final MessageType type, - final Map content) { - if (msgAnsweredOrTimedout.get() || onSync.get()) { - return; - } - - onSync.set(true); - - executorService.execute(new DescriptiveRunnable() { - @Override - public String getDescription() { - return "Cluster Messaging Reply [" + service.getId() + "/QUERY_FOR_FS]"; - } - - @Override - public void run() { - for (final Map.Entry entry : content.entrySet()) { - if (entry.getKey().startsWith("fs_")) { - int index = Integer.valueOf(entry.getKey().substring(entry.getKey().lastIndexOf("_") + 1)); - if (!fileSystems.containsKey(index)) { - fileSystems.put(index, - new FileSystemInfo()); - } - final FileSystemInfo fsInfo = fileSystems.get(index); - if (entry.getKey().startsWith("fs_id_")) { - fsInfo.setId(entry.getValue()); - } else if (entry.getKey().startsWith("fs_scheme_")) { - fsInfo.setScheme(entry.getValue()); - } else if (entry.getKey().startsWith("fs_uri_")) { - fsInfo.setUri(entry.getValue()); - } - } - } - - for (final FileSystemInfo fileSystemInfo : new HashSet(fileSystems.values())) { - try { - final URI newFS = URI.create(fileSystemInfo.getScheme() + "://" + fileSystemInfo.getId()); - service.newFileSystem(newFS, - Collections.emptyMap()); - } catch (FileSystemAlreadyExistsException ex) { - } - - final URI fs = URI.create(fileSystemInfo.getScheme() + "://" + fileSystemInfo.getId() + "?sync=" + fileSystemInfo.getUri().split("\n")[0] + "&force"); - service.getFileSystem(fs); - } - - msgAnsweredOrTimedout.set(true); - } - }); - } - }); - - while (!msgAnsweredOrTimedout.get()) { - try { - Thread.sleep(100); - } catch (InterruptedException ignored) { - } - } - - return null; - } - })); - } - - @Override - public void startBatch(final FileSystem _fs, - final Option... options) { - clusterService.lock(); - final FileSystem fs = _fs.getRootDirectories().iterator().next().getFileSystem(); - if (fs instanceof FileSystemId) { - batchFileSystems.add(((FileSystemId) fs).id()); - } - - service.startBatch(fs, - options); - } - - @Override - public void startBatch(final FileSystem fs) { - final FileSystem f = fs.getRootDirectories().iterator().next().getFileSystem(); - if (f instanceof FileSystemId) { - batchFileSystems.add(((FileSystemId) f).id()); - } - service.startBatch(fs); - } - - @Override - public void endBatch() { - service.endBatch(); - if (service.getLockControl().getHoldCount() == 0) { - final AtomicInteger process = new AtomicInteger(batchFileSystems.size()); - - for (final FileSystemMetadata fsInfo : service.getFileSystemMetadata()) { - - if (fsInfo.isAFileSystemID() && - batchFileSystems.contains(fsInfo.getId())) { - try { - new FileSystemSyncNonLock(service.getId(), - fsInfo).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - if (process.decrementAndGet() == 0) { - clusterService.unlock(); - } - return null; - } - })); - } catch (Exception ex) { - logger.error("End batch error", - ex); - if (process.decrementAndGet() == 0) { - clusterService.unlock(); - } - } - } - } - batchFileSystems.clear(); - } else { - clusterService.unlock(); - } - } - - @Override - public FileAttribute[] convert(final Map attrs) { - return service.convert(attrs); - } - - @Override - public Path get(final String first, - final String... more) throws IllegalArgumentException { - return service.get(first, - more); - } - - @Override - public Path get(final URI uri) throws IllegalArgumentException, FileSystemNotFoundException, SecurityException { - return service.get(uri); - } - - @Override - public Iterable getFileSystemMetadata() { - return service.getFileSystemMetadata(); - } - - @Override - public FileSystem getFileSystem(final URI uri) throws IllegalArgumentException, FileSystemNotFoundException, ProviderNotFoundException, SecurityException { - return service.getFileSystem(uri); - } - - @Override - public FileSystem newFileSystem(final URI uri, - final Map env) throws IllegalArgumentException, FileSystemAlreadyExistsException, ProviderNotFoundException, IOException, SecurityException { - if (env.containsKey("internal")) { - return service.newFileSystem(uri, - env); - } - - return new LockExecuteNotifySyncReleaseTemplate() { - - @Override - public MessageType getMessageType() { - return NEW_FS; - } - - @Override - public String getServiceId() { - return service.getId(); - } - - @Override - public Map buildContent() { - return new HashMap() {{ - put("uri", - uri.toString()); - for (final Map.Entry entry : env.entrySet()) { - put(entry.getKey(), - entry.getValue().toString()); - } - }}; - } - - @Override - public int timeOut() { - return TIMEOUT; - } - }.execute(clusterService, - new FutureTask(new Callable() { - @Override - public FileSystem call() throws Exception { - return service.newFileSystem(uri, - new HashMap(env) {{ - put("clusterService", - clusterService); - }}); - } - })); - } - - @Override - public void onNewFileSystem(NewFileSystemListener listener) { - this.newFileSystemListener = listener; - } - - @Override - public InputStream newInputStream(final Path path, - final OpenOption... options) throws IllegalArgumentException, NoSuchFileException, UnsupportedOperationException, IOException, SecurityException { - return service.newInputStream(path, - options); - } - - @Override - public DirectoryStream newDirectoryStream(final Path dir) throws IllegalArgumentException, NotDirectoryException, IOException, SecurityException { - return service.newDirectoryStream(dir); - } - - @Override - public DirectoryStream newDirectoryStream(final Path dir, - final DirectoryStream.Filter filter) throws IllegalArgumentException, NotDirectoryException, IOException, SecurityException { - return service.newDirectoryStream(dir, - filter); - } - - @Override - public Path createFile(final Path path, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.createFile(path, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.createFile(path, - attrs); - } - })); - } - - @Override - public Path createDirectory(final Path dir, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - if (isBatch(dir.getFileSystem())) { - return service.createDirectory(dir, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(dir.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.createDirectory(dir, - attrs); - } - })); - } - - @Override - public Path createDirectories(final Path dir, - final FileAttribute... attrs) throws UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - if (isBatch(dir.getFileSystem())) { - return service.createDirectories(dir, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(dir.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.createDirectories(dir, - attrs); - } - })); - } - - @Override - public Path createDirectory(final Path dir, - final Map attrs) throws IllegalArgumentException, UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - if (isBatch(dir.getFileSystem())) { - return service.createDirectory(dir, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(dir.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.createDirectory(dir, - attrs); - } - })); - } - - @Override - public Path createDirectories(final Path dir, - final Map attrs) throws UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - if (isBatch(dir.getFileSystem())) { - return service.createDirectories(dir, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(dir.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.createDirectories(dir, - attrs); - } - })); - } - - @Override - public void delete(final Path path, - final DeleteOption... options) throws IllegalArgumentException, NoSuchFileException, DirectoryNotEmptyException, IOException, SecurityException { - if (isBatch(path.getFileSystem())) { - service.delete(path, - options); - } else { - new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - service.delete(path, - options); - return null; - } - })); - } - } - - @Override - public boolean deleteIfExists(final Path path, - final DeleteOption... options) throws IllegalArgumentException, DirectoryNotEmptyException, IOException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.deleteIfExists(path, - options); - } - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Boolean call() throws Exception { - return service.deleteIfExists(path, - options); - } - })); - } - - @Override - public Path createTempFile(final String prefix, - final String suffix, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, IOException, SecurityException { - return service.createTempFile(prefix, - suffix, - attrs); - } - - @Override - public Path createTempFile(final Path dir, - final String prefix, - final String suffix, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, IOException, SecurityException { - return service.createTempFile(dir, - prefix, - suffix, - attrs); - } - - @Override - public Path createTempDirectory(final String prefix, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, IOException, SecurityException { - return service.createTempDirectory(prefix, - attrs); - } - - @Override - public Path createTempDirectory(final Path dir, - final String prefix, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, IOException, SecurityException { - return service.createTempDirectory(dir, - prefix, - attrs); - } - - @Override - public Path copy(final Path source, - final Path target, - final CopyOption... options) throws UnsupportedOperationException, FileAlreadyExistsException, DirectoryNotEmptyException, IOException, SecurityException { - if (isBatch(source.getFileSystem())) { - return service.copy(source, - target, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(target.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.copy(source, - target, - options); - } - })); - } - - @Override - public long copy(final InputStream in, - final Path target, - final CopyOption... options) throws IOException, FileAlreadyExistsException, DirectoryNotEmptyException, UnsupportedOperationException, SecurityException { - if (isBatch(target.getFileSystem())) { - return service.copy(in, - target, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(target.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Long call() throws Exception { - return service.copy(in, - target, - options); - } - })); - } - - @Override - public long copy(final Path source, - final OutputStream out) throws IOException, SecurityException { - return service.copy(source, - out); - } - - @Override - public Path move(final Path source, - final Path target, - final CopyOption... options) throws UnsupportedOperationException, FileAlreadyExistsException, DirectoryNotEmptyException, AtomicMoveNotSupportedException, IOException, SecurityException { - if (isBatch(source.getFileSystem())) { - return service.move(source, - target, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(source.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(target.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.move(source, - target, - options); - } - })); - } - })); - } - - @Override - public V getFileAttributeView(final Path path, - final Class type) throws IllegalArgumentException { - return service.getFileAttributeView(path, - type); - } - - @Override - public Map readAttributes(final Path path) throws UnsupportedOperationException, NoSuchFileException, IllegalArgumentException, IOException, SecurityException { - return service.readAttributes(path); - } - - @Override - public Map readAttributes(final Path path, - final String attributes) throws UnsupportedOperationException, NoSuchFileException, IllegalArgumentException, IOException, SecurityException { - return service.readAttributes(path, - attributes); - } - - @Override - public Path setAttributes(final Path path, - final FileAttribute... attrs) throws UnsupportedOperationException, IllegalArgumentException, ClassCastException, IOException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.setAttributes(path, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.setAttributes(path, - attrs); - } - })); - } - - @Override - public Path setAttributes(final Path path, - final Map attrs) throws UnsupportedOperationException, IllegalArgumentException, ClassCastException, IOException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.setAttributes(path, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.setAttributes(path, - attrs); - } - })); - } - - @Override - public Path setAttribute(final Path path, - final String attribute, - final Object value) throws UnsupportedOperationException, IllegalArgumentException, ClassCastException, IOException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.setAttribute(path, - attribute, - value); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.setAttribute(path, - attribute, - value); - } - })); - } - - @Override - public Object getAttribute(final Path path, - final String attribute) throws UnsupportedOperationException, IllegalArgumentException, IOException, SecurityException { - return service.getAttribute(path, - attribute); - } - - @Override - public FileTime getLastModifiedTime(final Path path) throws IllegalArgumentException, IOException, SecurityException { - return service.getLastModifiedTime(path); - } - - @Override - public long size(final Path path) throws IllegalArgumentException, IOException, SecurityException { - return service.size(path); - } - - @Override - public boolean exists(final Path path) throws IllegalArgumentException, SecurityException { - return service.exists(path); - } - - @Override - public boolean notExists(final Path path) throws IllegalArgumentException, SecurityException { - return service.notExists(path); - } - - @Override - public boolean isSameFile(final Path path, - final Path path2) throws IllegalArgumentException, IOException, SecurityException { - return service.isSameFile(path, - path2); - } - - @Override - public BufferedReader newBufferedReader(final Path path, - final Charset cs) throws IllegalArgumentException, NoSuchFileException, IOException, SecurityException { - return service.newBufferedReader(path, - cs); - } - - @Override - public byte[] readAllBytes(final Path path) throws IOException, OutOfMemoryError, SecurityException { - return service.readAllBytes(path); - } - - @Override - public List readAllLines(final Path path) throws IllegalArgumentException, NoSuchFileException, IOException, SecurityException { - return service.readAllLines(path); - } - - @Override - public List readAllLines(final Path path, - final Charset cs) throws IllegalArgumentException, NoSuchFileException, IOException, SecurityException { - return service.readAllLines(path, - cs); - } - - @Override - public String readAllString(final Path path, - final Charset cs) throws IllegalArgumentException, NoSuchFileException, IOException { - return service.readAllString(path, - cs); - } - - @Override - public String readAllString(final Path path) throws IllegalArgumentException, NoSuchFileException, IOException { - return service.readAllString(path); - } - - @Override - public Path write(final Path path, - final byte[] bytes, - final OpenOption... options) throws IOException, UnsupportedOperationException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - bytes, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - bytes, - options); - } - })); - } - - @Override - public Path write(final Path path, - final byte[] bytes, - final Map attrs, - final OpenOption... options) throws IOException, UnsupportedOperationException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - bytes, - attrs, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - bytes, - attrs, - options); - } - })); - } - - @Override - public Path write(final Path path, - final byte[] bytes, - final Set options, - final FileAttribute... attrs) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - bytes, - options, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - bytes, - options, - attrs); - } - })); - } - - @Override - public Path write(final Path path, - final Iterable lines, - final Charset cs, - final OpenOption... options) throws IllegalArgumentException, IOException, UnsupportedOperationException, SecurityException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - lines, - cs, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - lines, - cs, - options); - } - })); - } - - @Override - public Path write(final Path path, - final String content, - final OpenOption... options) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - content, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - content, - options); - } - })); - } - - @Override - public Path write(final Path path, - final String content, - final Charset cs, - final OpenOption... options) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - content, - cs, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - content, - cs, - options); - } - })); - } - - @Override - public Path write(final Path path, - final String content, - final Set options, - final FileAttribute... attrs) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - content, - options, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - content, - options, - attrs); - } - })); - } - - @Override - public Path write(final Path path, - final String content, - final Charset cs, - final Set options, - final FileAttribute... attrs) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - content, - cs, - options, - attrs); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - content, - cs, - options, - attrs); - } - })); - } - - @Override - public Path write(final Path path, - final String content, - final Map attrs, - final OpenOption... options) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - content, - attrs, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - content, - attrs, - options); - } - })); - } - - @Override - public Path write(final Path path, - final String content, - final Charset cs, - final Map attrs, - final OpenOption... options) throws IllegalArgumentException, IOException, UnsupportedOperationException { - if (isBatch(path.getFileSystem())) { - return service.write(path, - content, - cs, - attrs, - options); - } - - return new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Path call() throws Exception { - return service.write(path, - content, - cs, - attrs, - options); - } - })); - } - - @Override - public OutputStream newOutputStream(final Path path, - final OpenOption... options) throws IllegalArgumentException, UnsupportedOperationException, IOException, SecurityException { - final OutputStream out = service.newOutputStream(path, - options); - return new OutputStream() { - @Override - public void write(final int b) throws java.io.IOException { - out.write(b); - } - - @Override - public void close() throws java.io.IOException { - if (isBatch(path.getFileSystem())) { - out.close(); - } else { - new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - out.close(); - return null; - } - })); - } - } - }; - } - - @Override - public SeekableByteChannel newByteChannel(final Path path, - final OpenOption... options) throws IllegalArgumentException, UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - final SeekableByteChannel sbc = service.newByteChannel(path, - options); - - return new SeekableByteChannelWrapperImpl(sbc) { - @Override - public void close() throws java.io.IOException { - if (isBatch(path.getFileSystem())) { - sbc.close(); - } else { - new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - sbc.close(); - return null; - } - })); - } - } - }; - } - - @Override - public SeekableByteChannel newByteChannel(final Path path, - final Set options, - final FileAttribute... attrs) throws IllegalArgumentException, UnsupportedOperationException, FileAlreadyExistsException, IOException, SecurityException { - final SeekableByteChannel sbc = service.newByteChannel(path, - options, - attrs); - - return new SeekableByteChannelWrapperImpl(sbc) { - @Override - public void close() throws java.io.IOException { - if (isBatch(path.getFileSystem())) { - sbc.close(); - } else { - new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - sbc.close(); - return null; - } - })); - } - } - }; - } - - @Override - public BufferedWriter newBufferedWriter(final Path path, - final Charset cs, - final OpenOption... options) throws IllegalArgumentException, IOException, UnsupportedOperationException, SecurityException { - return new BufferedWriter(service.newBufferedWriter(path, - cs, - options)) { - @Override - public void close() throws java.io.IOException { - if (isBatch(path.getFileSystem())) { - superClose(); - } else { - new FileSystemSyncLock(service.getId(), - new FileSystemMetadata(path.getFileSystem())).execute(clusterService, - new FutureTask(new Callable() { - @Override - public Void call() throws Exception { - superClose(); - return null; - } - })); - } - } - - private void superClose() { - try { - super.close(); - } catch (java.io.IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - private boolean isBatch(final FileSystem fs) { - return fs instanceof FileSystemStateAware && ((FileSystemStateAware) (fs)).getState().equals(FileSystemState.BATCH); - } - - @Override - public void dispose() { - service.dispose(); - } - - @Override - public int priority() { - return service.priority() - 1; - } - - static class FileSystemInfo { - - private String id; - private String scheme; - private String uri; - - FileSystemInfo() { - - } - - FileSystemInfo(String id, - String scheme, - String uri) { - this.id = id; - this.scheme = scheme; - this.uri = uri; - } - - String getId() { - return id; - } - - void setId(String id) { - this.id = id; - } - - String getScheme() { - return scheme; - } - - void setScheme(String scheme) { - this.scheme = scheme; - } - - String getUri() { - return uri; - } - - void setUri(String uri) { - this.uri = uri; - } - } - - class NewFileSystemMessageHandler implements MessageHandler { - - @Override - public Pair> handleMessage(final MessageType type, - final Map content) { - if (NEW_FS.equals(type)) { - final String _uri = content.get("uri"); - final Map env = new HashMap(); - - for (final Map.Entry entry : content.entrySet()) { - if (!(entry.getKey().equals("uri") || entry.getKey().equals("type"))) { - env.put(entry.getKey(), - entry.getValue()); - } - } - - final URI uri = URI.create(_uri); - final FileSystem fs = service.newFileSystem(uri, - env); - if (newFileSystemListener != null) { - newFileSystemListener.execute(fs, - uri.getScheme(), - ((FileSystemId) fs).id(), - env); - } - } - return null; - } - } - - class SyncFileSystemMessageHandler implements MessageHandler { - - @Override - public Pair> handleMessage(final MessageType type, - final Map content) { - if (SYNC_FS.equals(type)) { - final String scheme = content.get("fs_scheme"); - final String id = content.get("fs_id"); - final String[] supportedUris = cleanup(content.get("fs_uri").split("\n")); - - for (final String supportedUri : supportedUris) { - try { - String origin; - try { - origin = URLEncoder.encode(supportedUri, - "UTF-8"); - } catch (UnsupportedEncodingException e) { - origin = supportedUri; - } - - final URI fs = URI.create(scheme + "://" + id + "?sync=" + origin + "&force"); - - service.getFileSystem(fs); - break; - } catch (Exception e) { - // try the other supported uri in case of failure - logger.warn("File system synchronization for origin {} failed with error {}, trying another if available", - supportedUri, - e.getMessage()); - } - } - } - - return null; - } - - private String[] cleanup(final String... split) { - final List result = new ArrayList(split.length); - for (final String s : split) { - if (s.startsWith("git://")) { - result.add(s); - } - } - - return result.toArray(new String[result.size()]); - } - } - - class QueryFileSystemMessageHandler implements MessageHandler { - - @Override - public Pair> handleMessage(final MessageType type, - final Map content) { - if (QUERY_FOR_FS.equals(type)) { - Map replyContent = new HashMap(); - int i = 0; - - final Set fileSystemsInfo = new HashSet(); - for (FileSystemMetadata fs : service.getFileSystemMetadata()) { - fileSystemsInfo.add(fs); - } - - for (final FileSystemMetadata fsInfo : fileSystemsInfo) { - replyContent.put("fs_scheme_" + i, - fsInfo.getScheme()); - replyContent.put("fs_id_" + i, - fsInfo.getId()); - replyContent.put("fs_uri_" + i, - fsInfo.getUri()); - i++; - } - return new Pair>(QUERY_FOR_FS_RESULT, - replyContent); - } - return null; - } - } -} diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/ClusterServiceHelix.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/ClusterServiceHelix.java deleted file mode 100644 index 00a55455e1..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/ClusterServiceHelix.java +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster.helix; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.helix.Criteria; -import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; -import org.apache.helix.NotificationContext; -import org.apache.helix.messaging.handling.HelixTaskResult; -import org.apache.helix.messaging.handling.MessageHandler; -import org.apache.helix.messaging.handling.MessageHandlerFactory; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.commons.data.Pair; -import org.uberfire.commons.lifecycle.PriorityDisposableRegistry; -import org.uberfire.commons.message.AsyncCallback; -import org.uberfire.commons.message.MessageHandlerResolver; -import org.uberfire.commons.message.MessageType; -import org.uberfire.io.impl.cluster.ClusterMessageType; - -import static java.util.Arrays.asList; -import static java.util.UUID.randomUUID; -import static org.apache.helix.HelixManagerFactory.getZKHelixManager; - -public class ClusterServiceHelix implements ClusterService { - - private static final AtomicInteger counter = new AtomicInteger(0); - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceHelix.class); - - private static final String ORIGIN = "origin"; - private static final String SERVICE_ID = "serviceId"; - private static final String OFFLINE = "OFFLINE"; - - private final String clusterName; - private final String instanceName; - private final HelixManager participantManager; - private final String resourceName; - private final Map messageHandlerResolver = new ConcurrentHashMap(); - - private final ReentrantLock lock = new ReentrantLock(true); - - public ClusterServiceHelix(final String clusterName, - final String zkAddress, - final String instanceName, - final String resourceName, - final MessageHandlerResolver messageHandlerResolver) { - this.clusterName = clusterName; - this.instanceName = instanceName; - this.resourceName = resourceName; - addMessageHandlerResolver(messageHandlerResolver); - this.participantManager = getZkHelixManager(clusterName, - zkAddress, - instanceName); - PriorityDisposableRegistry.register(this); - start(); - } - - HelixManager getZkHelixManager(String clusterName, - String zkAddress, - String instanceName) { - return getZKHelixManager(clusterName, - instanceName, - InstanceType.PARTICIPANT, - zkAddress); - } - - //TODO {porcelli} quick hack for now, the real solution would have a cluster per repo - @Override - public void addMessageHandlerResolver(final MessageHandlerResolver resolver) { - if (resolver != null) { - this.messageHandlerResolver.put(resolver.getServiceId(), - resolver); - } - } - - void start() { - try { - participantManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), - new MessageHandlerResolverWrapper().convert()); - participantManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", - new LockTransitionalFactory()); - participantManager.connect(); - offlinePartition(); - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - } - - String getNodeStatus() { - final String partition = resourceName + "_0"; - final ExternalView view = getResourceExternalView(); - if (clusterIsNotSetYet(view, - partition)) { - return OFFLINE; - } - final Map stateMap = view.getStateMap(partition); - return stateMap.get(instanceName); - } - - ExternalView getResourceExternalView() { - return participantManager.getClusterManagmentTool().getResourceExternalView(clusterName, - resourceName); - } - - private boolean clusterIsNotSetYet(ExternalView view, - String partition) { - //first start with fresh setup - if (view == null) { - return true; - } - final Map stateMap = view.getStateMap(partition); - - return stateMap == null || stateMap.get(instanceName) == null; - } - - @Override - public void dispose() { - if (participantManager != null && participantManager.isConnected()) { - participantManager.disconnect(); - } - } - - @Override - public void onStart(final Runnable runnable) { - runnable.run(); - } - - @Override - public int getHoldCount() { - return lock.getHoldCount(); - } - - private void offlinePartition() { - if (OFFLINE.equals(getNodeStatus())) { - return; - } - participantManager.getClusterManagmentTool().enablePartition(false, - clusterName, - instanceName, - resourceName, - asList(resourceName + "_0")); - while (!OFFLINE.equals(getNodeStatus())) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } - } - - private void enablePartition() { - if ("LEADER".equals(getNodeStatus())) { - return; - } - participantManager.getClusterManagmentTool().enablePartition(true, - clusterName, - instanceName, - resourceName, - asList(resourceName + "_0")); - while (!"LEADER".equals(getNodeStatus())) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } - } - - private void disablePartition() { - String nodeStatus = getNodeStatus(); - if ("STANDBY".equals(nodeStatus) || OFFLINE.equals(nodeStatus)) { - return; - } - participantManager.getClusterManagmentTool().enablePartition(false, - clusterName, - instanceName, - resourceName, - asList(resourceName + "_0")); - - while (!("STANDBY".equals(nodeStatus) || OFFLINE.equals(nodeStatus))) { - try { - Thread.sleep(10); - nodeStatus = getNodeStatus(); - } catch (InterruptedException e) { - } - } - } - - @Override - public void lock() { - lock.lock(); - - enablePartition(); - } - - @Override - public void unlock() { - disablePartition(); - - lock.unlock(); - } - - @Override - public void broadcastAndWait(final String serviceId, - final MessageType type, - final Map content, - int timeOut) { - participantManager.getMessagingService().sendAndWait(buildCriteria(), - buildMessage(serviceId, - type, - content), - new org.apache.helix.messaging.AsyncCallback(timeOut) { - @Override - public void onTimeOut() { - } - - @Override - public void onReplyMessage(final Message message) { - } - }, - timeOut); - } - - @Override - public void broadcastAndWait(final String serviceId, - final MessageType type, - final Map content, - final int timeOut, - final AsyncCallback callback) { - int msg = participantManager.getMessagingService().sendAndWait(buildCriteria(), - buildMessage(serviceId, - type, - content), - new org.apache.helix.messaging.AsyncCallback() { - @Override - public void onTimeOut() { - callback.onTimeOut(); - } - - @Override - public void onReplyMessage(final Message message) { - final MessageType type = buildMessageTypeFromReply(message); - final Map map = getMessageContentFromReply(message); - - callback.onReply(type, - map); - } - }, - timeOut); - if (msg == 0) { - callback.onTimeOut(); - } - } - - @Override - public void broadcast(final String serviceId, - final MessageType type, - final Map content) { - participantManager.getMessagingService().send(buildCriteria(), - buildMessage(serviceId, - type, - content)); - } - - @Override - public void broadcast(final String serviceId, - final MessageType type, - final Map content, - final int timeOut, - final AsyncCallback callback) { - participantManager.getMessagingService().send(buildCriteria(), - buildMessage(serviceId, - type, - content), - new org.apache.helix.messaging.AsyncCallback() { - @Override - public void onTimeOut() { - callback.onTimeOut(); - } - - @Override - public void onReplyMessage(final Message message) { - final MessageType type = buildMessageTypeFromReply(message); - final Map map = getMessageContent(message); - - callback.onReply(type, - map); - } - }, - timeOut); - } - - @Override - public void sendTo(final String serviceId, - final String resourceId, - final MessageType type, - final Map content) { - participantManager.getMessagingService().send(buildCriteria(resourceId), - buildMessage(serviceId, - type, - content)); - } - - private Criteria buildCriteria(final String resourceId) { - return new Criteria() {{ - setInstanceName(resourceId); - setRecipientInstanceType(InstanceType.PARTICIPANT); - setResource(resourceName); - setSelfExcluded(true); - setSessionSpecific(true); - }}; - } - - private Criteria buildCriteria() { - return buildCriteria("%"); - } - - private Message buildMessage(final String serviceId, - final MessageType type, - final Map content) { - return new Message(Message.MessageType.USER_DEFINE_MSG, - randomUUID().toString()) {{ - setMsgState(Message.MessageState.NEW); - getRecord().setMapField("content", - content); - getRecord().setSimpleField(SERVICE_ID, - serviceId); - getRecord().setSimpleField("type", - type.toString()); - getRecord().setSimpleField(ORIGIN, - instanceName); - }}; - } - - @Override - public int priority() { - return Integer.MIN_VALUE + 200; - } - - private MessageType buildMessageType(final String _type) { - if (_type == null) { - return null; - } - - MessageType type; - try { - type = ClusterMessageType.valueOf(_type); - } catch (Exception ex) { - type = new MessageType() { - @Override - public String toString() { - return _type; - } - - @Override - public int hashCode() { - return _type.hashCode(); - } - }; - } - - return type; - } - - private MessageType buildMessageTypeFromReply(Message message) { - final Map result = message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()); - return buildMessageType(result.get("type")); - } - - private Map getMessageContent(final Message message) { - return message.getRecord().getMapField("content"); - } - - private Map getMessageContentFromReply(final Message message) { - return new HashMap() {{ - for (final Map.Entry field : message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).entrySet()) { - if (!field.getKey().equals(SERVICE_ID) && !field.getKey().equals(ORIGIN) && !field.getKey().equals("type")) { - put(field.getKey(), - field.getValue()); - } - } - }}; - } - - class MessageHandlerResolverWrapper { - - MessageHandlerFactory convert() { - return new MessageHandlerFactory() { - - @Override - public MessageHandler createHandler(final Message message, - final NotificationContext context) { - - return new MessageHandler(message, - context) { - @Override - public HelixTaskResult handleMessage() throws InterruptedException { - try { - final String serviceId = _message.getRecord().getSimpleField(SERVICE_ID); - final MessageType type = buildMessageType(_message.getRecord().getSimpleField("type")); - final Map map = getMessageContent(_message); - - final MessageHandlerResolver resolver = messageHandlerResolver.get(serviceId); - - if (resolver == null) { - System.err.println("serviceId not found '" + serviceId + "'"); - return new HelixTaskResult() {{ - setSuccess(false); - setMessage("Can't find resolver"); - }}; - } - - final org.uberfire.commons.message.MessageHandler handler = resolver.resolveHandler(serviceId, - type); - - if (handler == null) { - System.err.println("handler not found for '" + serviceId + "' and type '" + type.toString() + "'"); - return new HelixTaskResult() {{ - setSuccess(false); - setMessage("Can't find handler."); - }}; - } - - final Pair> result = handler.handleMessage(type, - map); - - if (result == null) { - return new HelixTaskResult() {{ - setSuccess(true); - }}; - } - - return new HelixTaskResult() {{ - setSuccess(true); - getTaskResultMap().put(SERVICE_ID, - serviceId); - getTaskResultMap().put("type", - result.getK1().toString()); - getTaskResultMap().put(ORIGIN, - instanceName); - for (Map.Entry entry : result.getK2().entrySet()) { - getTaskResultMap().put(entry.getKey(), - entry.getValue()); - } - }}; - } catch (final Throwable e) { - logger.error("Error while processing cluster message", - e); - return new HelixTaskResult() {{ - setSuccess(false); - setMessage(e.getMessage()); - setException(new RuntimeException(e)); - }}; - } - } - - @Override - public void onError(final Exception e, - final ErrorCode code, - final ErrorType type) { - } - }; - } - - @Override - public String getMessageType() { - return Message.MessageType.USER_DEFINE_MSG.toString(); - } - - @Override - public void reset() { - } - }; - } - } -} \ No newline at end of file diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/LockTransitionModel.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/LockTransitionModel.java deleted file mode 100644 index 6ddcc459eb..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/LockTransitionModel.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster.helix; - -import org.apache.helix.NotificationContext; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelInfo; -import org.apache.helix.participant.statemachine.Transition; - -@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"}) -public class LockTransitionModel extends StateModel { - - private final String lockName; - - public LockTransitionModel(final String lockName) { - this.lockName = lockName; - } - - @Transition(from = "STANDBY", to = "LEADER") - public void lock(final Message m, - final NotificationContext context) { - } - - @Transition(from = "LEADER", to = "STANDBY") - public void release(final Message m, - final NotificationContext context) { - } - - @Transition(from = "STANDBY", to = "OFFLINE") - public void toOffLine(final Message m, - final NotificationContext context) { - } - - @Transition(from = "OFFLINE", to = "STANDBY") - public void toStandBy(final Message m, - final NotificationContext context) { - } - - @Transition(from = "OFFLINE", to = "DROPPED") - public void dropped(final Message m, - final NotificationContext context) { - } -} diff --git a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/LockTransitionalFactory.java b/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/LockTransitionalFactory.java deleted file mode 100644 index d0afe96a21..0000000000 --- a/uberfire-io/src/main/java/org/uberfire/io/impl/cluster/helix/LockTransitionalFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster.helix; - -import org.apache.helix.participant.statemachine.StateModelFactory; - -public class LockTransitionalFactory extends StateModelFactory { - - @Override - public LockTransitionModel createNewStateModel(final String lockName) { - return new LockTransitionModel(lockName); - } -} diff --git a/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/FileSystemSyncTest.java b/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/FileSystemSyncTest.java deleted file mode 100644 index e5ae592da3..0000000000 --- a/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/FileSystemSyncTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster; - -import java.net.URI; -import java.util.Arrays; -import java.util.Map; - -import org.junit.Test; -import org.uberfire.java.nio.base.FileSystemId; -import org.uberfire.java.nio.file.FileSystem; -import org.uberfire.java.nio.file.FileSystemMetadata; -import org.uberfire.java.nio.file.Path; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class FileSystemSyncTest { - - @Test - public void testFileSystemToCheckProxyIssuesWithExtraInterfaces() { - final FileSystem mockedFS = mock(FileSystem.class); - final FileSystem mockedFSId = mock(FileSystem.class, - withSettings().extraInterfaces(FileSystemId.class)); - - final Path rootPath = mock(Path.class); - - - when(mockedFS.getRootDirectories()).thenReturn(Arrays.asList(rootPath)); - when(mockedFSId.getRootDirectories()).thenReturn(Arrays.asList(rootPath)); - - when(rootPath.getFileSystem()).thenReturn(mockedFSId); - when(rootPath.toUri()).thenReturn(URI.create("jgit://myrepo")); - - when(((FileSystemId) mockedFSId).id()).thenReturn("my-fsid"); - - { - final FileSystemSyncLock fileSystemSyncLock = new FileSystemSyncLock("serviceId", - new FileSystemMetadata(mockedFSId)); - final Map content = fileSystemSyncLock.buildContent(); - - assertEquals("my-fsid", - content.get("fs_id")); - } - { - final FileSystemSyncNonLock fileSystemSyncNonLock = new FileSystemSyncNonLock("serviceId", - new FileSystemMetadata(mockedFSId)); - final Map content = fileSystemSyncNonLock.buildContent(); - - assertEquals("my-fsid", - content.get("fs_id")); - } - } -} diff --git a/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/IOServiceClusterImplTest.java b/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/IOServiceClusterImplTest.java deleted file mode 100644 index 3daa7be3b4..0000000000 --- a/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/IOServiceClusterImplTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster; - -import java.net.URI; -import java.util.Arrays; -import java.util.List; - -import org.junit.Test; -import org.uberfire.commons.cluster.ClusterService; -import org.uberfire.io.impl.IOServiceLockable; -import org.uberfire.io.lock.BatchLockControl; -import org.uberfire.java.nio.base.FileSystemId; -import org.uberfire.java.nio.file.FileSystem; -import org.uberfire.java.nio.file.FileSystemMetadata; -import org.uberfire.java.nio.file.Option; -import org.uberfire.java.nio.file.Path; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class IOServiceClusterImplTest { - - @Test - public void testFileSystemToCheckProxyIssuesWithExtraInterfaces() { - final FileSystem mockedFS = mock(FileSystem.class); - final FileSystem mockedFSId = mock(FileSystem.class, - withSettings().extraInterfaces(FileSystemId.class)); - - final Path rootPath = mock(Path.class); - - when(mockedFS.getRootDirectories()).thenReturn(Arrays.asList(rootPath)); - when(mockedFSId.getRootDirectories()).thenReturn(Arrays.asList(rootPath)); - - when(rootPath.getFileSystem()).thenReturn(mockedFSId); - when(rootPath.toUri()).thenReturn(URI.create("jgit://myrepo")); - - when(((FileSystemId) mockedFSId).id()).thenReturn("my-fsid"); - - final ClusterService clusterService = mock(ClusterService.class); - final IOServiceLockable serviceLockable = mock(IOServiceLockable.class); - final BatchLockControl batchLockControl = mock(BatchLockControl.class); - - List fsInfo = Arrays.asList(new FileSystemMetadata(mockedFSId), - new FileSystemMetadata(mockedFS)); - when(serviceLockable.getFileSystemMetadata()).thenReturn(fsInfo); - when(batchLockControl.getHoldCount()).thenReturn(0); - when(serviceLockable.getLockControl()).thenReturn(batchLockControl); - - { - final IOServiceClusterImpl ioServiceCluster = new TestWrapper(clusterService, - serviceLockable); - - assertEquals(0, - ioServiceCluster.batchFileSystems.size()); - - ioServiceCluster.startBatch(mockedFS); - - assertEquals(1, - ioServiceCluster.batchFileSystems.size()); - - assertTrue(ioServiceCluster.batchFileSystems.contains(((FileSystemId) mockedFSId).id())); - - ioServiceCluster.endBatch(); - - verify(serviceLockable, - times(1)).endBatch(); - - assertEquals(0, - ioServiceCluster.batchFileSystems.size()); - - verify(clusterService, - times(1)).unlock(); - } - - { - final IOServiceClusterImpl ioServiceCluster = new TestWrapper(clusterService, - serviceLockable); - - assertEquals(0, - ioServiceCluster.batchFileSystems.size()); - - ioServiceCluster.startBatch(mockedFS, - mock(Option.class)); - - assertEquals(1, - ioServiceCluster.batchFileSystems.size()); - - assertTrue(ioServiceCluster.batchFileSystems.contains(((FileSystemId) mockedFSId).id())); - - ioServiceCluster.endBatch(); - - verify(serviceLockable, - times(2)).endBatch(); - - assertEquals(0, - ioServiceCluster.batchFileSystems.size()); - - verify(clusterService, - times(2)).unlock(); - } - - { - final IOServiceClusterImpl ioServiceCluster = new TestWrapper(clusterService, - serviceLockable); - - assertEquals(0, - ioServiceCluster.batchFileSystems.size()); - - ioServiceCluster.startBatch(mockedFS, - mock(Option.class)); - - assertEquals(1, - ioServiceCluster.batchFileSystems.size()); - - assertTrue(ioServiceCluster.batchFileSystems.contains(((FileSystemId) mockedFSId).id())); - - ioServiceCluster.endBatch(); - - verify(serviceLockable, - times(3)).endBatch(); - - assertEquals(0, - ioServiceCluster.batchFileSystems.size()); - - verify(clusterService, - times(3)).unlock(); - } - } - - private class TestWrapper extends IOServiceClusterImpl { - - public TestWrapper(final ClusterService clusterService, - final IOServiceLockable service) { - this.clusterService = clusterService; - this.service = service; - } - } -} diff --git a/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/helix/ClusterServiceHelixTest.java b/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/helix/ClusterServiceHelixTest.java deleted file mode 100644 index deed641442..0000000000 --- a/uberfire-io/src/test/java/org/uberfire/io/impl/cluster/helix/ClusterServiceHelixTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.uberfire.io.impl.cluster.helix; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.helix.HelixManager; -import org.apache.helix.model.ExternalView; -import org.junit.Before; -import org.junit.Test; -import org.uberfire.commons.lifecycle.PriorityDisposableRegistry; -import org.uberfire.commons.message.MessageHandlerResolver; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class ClusterServiceHelixTest { - - ClusterServiceHelix clusterServiceHelix; - ExternalView externalView; - - @Test - public void getNodeStatusEmptyOrNullShouldReturnOfflineTest() { - - when(externalView.getStateMap("resourceName_0")).thenReturn(null); - assertEquals("OFFLINE", - clusterServiceHelix.getNodeStatus()); - - Map emptyMap = new HashMap(); - when(externalView.getStateMap("resourceName_0")).thenReturn(emptyMap); - assertEquals("OFFLINE", - clusterServiceHelix.getNodeStatus()); - } - - @Test - public void getNodeStatusTest() { - - Map valueMap = new HashMap(); - valueMap.put("instanceName", - "LEADER"); - when(externalView.getStateMap("resourceName_0")).thenReturn(valueMap); - assertEquals("LEADER", - clusterServiceHelix.getNodeStatus()); - } - - @Test - public void getNodeStatusNullViewTest() { - externalView = null; - assertEquals("OFFLINE", - clusterServiceHelix.getNodeStatus()); - } - - @Before - public void setup() { - externalView = mock(ExternalView.class); - - clusterServiceHelix = new ClusterServiceHelix("clusterName", - "zkAddress", - "instanceName", - "resourceName", - mock(MessageHandlerResolver.class)) { - @Override - HelixManager getZkHelixManager(String clusterName, - String zkAddress, - String instanceName) { - return mock(HelixManager.class); - } - - @Override - void start() { - } - - @Override - public void addMessageHandlerResolver(MessageHandlerResolver resolver) { - } - - @Override - ExternalView getResourceExternalView() { - return externalView; - } - }; - - assertTrue(PriorityDisposableRegistry.getDisposables().contains(clusterServiceHelix)); - } -} \ No newline at end of file diff --git a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/pom.xml b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/pom.xml index aa2ed9d791..bb179a20af 100644 --- a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/pom.xml +++ b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/pom.xml @@ -122,54 +122,6 @@ byteman-bmunit test - - org.apache.activemq - artemis-jms-client - - - org.apache.geronimo.specs - geronimo-jms_2.0_spec - - - commons-logging - commons-logging - - - io.netty - netty-all - - - - - org.jboss.spec.javax.jms - jboss-jms-api_2.0_spec - - - io.netty - netty-buffer - - - io.netty - netty-transport - - - io.netty - netty-handler - - - io.netty - netty-transport-native-epoll - ${netty-transport-native-epoll-classifier} - - - io.netty - netty-transport-native-kqueue - ${netty-transport-native-kqueue-classifier} - - - io.netty - netty-codec-http - diff --git a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManager.java b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManager.java index b0fa24c26c..b2ee899085 100644 --- a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManager.java +++ b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManager.java @@ -21,11 +21,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.uberfire.commons.cluster.ClusterJMSService; +import org.uberfire.commons.cluster.ClusterParameters; import org.uberfire.java.nio.IOException; import org.uberfire.java.nio.file.Path; import org.uberfire.java.nio.file.WatchEvent; import org.uberfire.java.nio.file.WatchService; -import org.uberfire.java.nio.fs.jgit.ws.cluster.ClusterParameters; import org.uberfire.java.nio.fs.jgit.ws.cluster.JGitEventsBroadcast; public class JGitFileSystemsEventsManager { @@ -34,24 +35,24 @@ public class JGitFileSystemsEventsManager { private final Map fsWatchServices = new ConcurrentHashMap<>(); - private final ClusterParameters clusterParameters; + private final ClusterJMSService clusterJMSService; JGitEventsBroadcast jGitEventsBroadcast; public JGitFileSystemsEventsManager() { - clusterParameters = loadClusterParameters(); + clusterJMSService = createClusterJMSService(); - if (clusterParameters.isAppFormerClustered()) { + if (clusterJMSService.isAppFormerClustered()) { setupJGitEventsBroadcast(); } } - ClusterParameters loadClusterParameters() { - return new ClusterParameters(); + ClusterJMSService createClusterJMSService() { + return new ClusterJMSService(); } void setupJGitEventsBroadcast() { - jGitEventsBroadcast = new JGitEventsBroadcast(clusterParameters, + jGitEventsBroadcast = new JGitEventsBroadcast(clusterJMSService, w -> publishEvents(w.getFsName(), w.getWatchable(), w.getEvents(), diff --git a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/JGitEventsBroadcast.java b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/JGitEventsBroadcast.java index 630bf2da1a..61e9664568 100644 --- a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/JGitEventsBroadcast.java +++ b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/main/java/org/uberfire/java/nio/fs/jgit/ws/cluster/JGitEventsBroadcast.java @@ -16,25 +16,16 @@ package org.uberfire.java.nio.fs.jgit.ws.cluster; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.function.Consumer; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.uberfire.commons.cluster.ClusterJMSService; import org.uberfire.java.nio.file.Path; import org.uberfire.java.nio.file.WatchEvent; @@ -43,48 +34,26 @@ public class JGitEventsBroadcast { private static final Logger LOGGER = LoggerFactory.getLogger(JGitEventsBroadcast.class); public static final String DEFAULT_APPFORMER_TOPIC = "default-appformer-topic"; - private List consumerSessions = new ArrayList<>(); private String nodeId = UUID.randomUUID().toString(); - private ClusterParameters clusterParameters; private Consumer eventsPublisher; - private Connection connection; + private final ClusterJMSService clusterJMSService; - public JGitEventsBroadcast(ClusterParameters clusterParameters, + public JGitEventsBroadcast(ClusterJMSService clusterJMSService, Consumer eventsPublisher) { - this.clusterParameters = clusterParameters; + this.clusterJMSService = clusterJMSService; this.eventsPublisher = eventsPublisher; setupJMSConnection(); } private void setupJMSConnection() { - - String jmsURL = clusterParameters.getJmsURL(); - String jmsUserName = clusterParameters.getJmsUserName(); - String jmsPassword = clusterParameters.getJmsPassword(); - ConnectionFactory factory = new ActiveMQConnectionFactory(jmsURL, - jmsUserName, - jmsPassword); - - try { - connection = factory.createConnection(); - connection.setExceptionListener(new JMSExceptionListener()); - connection.start(); - } catch (Exception e) { - LOGGER.error("Error connecting on JMS " + e.getMessage()); - throw new RuntimeException(e); - } + clusterJMSService.connect(); } public void createWatchServiceJMS(String topicName) { - try { - Session consumerSession = createConsumerSession(); - Destination topic = getTopic(topicName, - consumerSession); - MessageConsumer messageConsumer = consumerSession.createConsumer(topic); - messageConsumer.setMessageListener(message -> topicMessageListener(message)); - } catch (Exception e) { - LOGGER.error("Error creating JMS Watch Service: " + e.getMessage()); - } + clusterJMSService.createConsumer( + ClusterJMSService.DESTINATION_TYPE.TOPIC, + getTopicName(topicName), + message -> topicMessageListener(message)); } private void topicMessageListener(Message message) { @@ -103,67 +72,27 @@ private void topicMessageListener(Message message) { } } - private Session createConsumerSession() throws JMSException { - Session session = connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - consumerSessions.add(session); - return session; - } - public synchronized void broadcast(String fsName, Path watchable, List> events) { - Session session = null; - try { - session = connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - Topic topic = getTopic(fsName, - session); - ObjectMessage objectMessage = session.createObjectMessage(new WatchEventsWrapper(nodeId, - fsName, - watchable, - events)); - MessageProducer messageProducer = session.createProducer(topic); - messageProducer.send(objectMessage); - } catch (JMSException e) { - LOGGER.error("Exception on JMS broadcast: " + e.getMessage()); - } finally { - if (session != null) { - try { - session.close(); - } catch (JMSException e) { - LOGGER.error("Exception on closing JMS session (this could trigger a leak) " + e.getMessage()); - } - } - } + clusterJMSService.broadcast(ClusterJMSService.DESTINATION_TYPE.TOPIC, + getTopicName(fsName), + new WatchEventsWrapper(nodeId, + fsName, + watchable, + events)); } - private Topic getTopic(String fsName, - Session session) throws JMSException { + private String getTopicName(String fsName) { String topicName = DEFAULT_APPFORMER_TOPIC; if (fsName.contains("/")) { topicName = fsName.substring(0, fsName.indexOf("/")); } - return session.createTopic(topicName); + return topicName; } public void close() { - try { - for (Session s : consumerSessions) { - s.close(); - } - connection.close(); - } catch (JMSException e) { - LOGGER.error("Exception closing JMS connection and consumerSessions: " + e.getMessage()); - } - } - - private static class JMSExceptionListener implements ExceptionListener { - - @Override - public void onException(JMSException e) { - LOGGER.error("JMSException: " + e.getMessage()); - } + clusterJMSService.close(); } } diff --git a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/test/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManagerTest.java b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/test/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManagerTest.java index fbb1b40565..2cc1b9be45 100644 --- a/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/test/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManagerTest.java +++ b/uberfire-nio2-backport/uberfire-nio2-impls/uberfire-nio2-jgit/src/test/java/org/uberfire/java/nio/fs/jgit/ws/JGitFileSystemsEventsManagerTest.java @@ -22,10 +22,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; +import org.uberfire.commons.cluster.ClusterJMSService; +import org.uberfire.commons.cluster.ClusterParameters; import org.uberfire.java.nio.file.Path; import org.uberfire.java.nio.file.WatchEvent; import org.uberfire.java.nio.file.WatchService; -import org.uberfire.java.nio.fs.jgit.ws.cluster.ClusterParameters; import org.uberfire.java.nio.fs.jgit.ws.cluster.JGitEventsBroadcast; import static org.junit.Assert.*; @@ -56,9 +57,10 @@ JGitFileSystemWatchServices createFSWatchServicesManager() { @Test public void doNotSetupClusterTest() { JGitFileSystemsEventsManager another = new JGitFileSystemsEventsManager() { + @Override - ClusterParameters loadClusterParameters() { - return mock(ClusterParameters.class); + ClusterJMSService createClusterJMSService() { + return mock(ClusterJMSService.class); } }; assertNull(another.getjGitEventsBroadcast());