diff --git a/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractControllerServer.java b/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractControllerServer.java index 6fb925790..1f146b88d 100644 --- a/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractControllerServer.java +++ b/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractControllerServer.java @@ -1258,6 +1258,6 @@ public String toString() { if (publisher == null) { return getClass().getSimpleName(); } - return getClass().getSimpleName() + "[" + publisher.getScope() + "]"; + return getClass().getSimpleName() + "[" + ScopeProcessor.generateStringRep(publisher.getScope(), "?") + "]"; } } diff --git a/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractRemoteClient.java b/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractRemoteClient.java index f1e00883e..13e5f9303 100644 --- a/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractRemoteClient.java +++ b/module/communication/controller/src/main/java/org/openbase/jul/communication/controller/AbstractRemoteClient.java @@ -10,12 +10,12 @@ * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Lesser Public License for more details. - * + * * You should have received a copy of the GNU General Lesser Public * License along with this program. If not, see * . @@ -26,7 +26,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import kotlin.Unit; -import kotlin.jvm.functions.Function1; import kotlin.jvm.functions.Function2; import org.openbase.jps.core.JPService; import org.openbase.jul.communication.config.CommunicatorConfig; @@ -58,7 +57,6 @@ import org.openbase.jul.schedule.WatchDog.ServiceState; import org.openbase.type.communication.EventType.Event; import org.openbase.type.communication.ScopeType.Scope; -import org.openbase.type.communication.mqtt.PrimitiveType.Primitive; import org.openbase.type.domotic.state.ConnectionStateType.ConnectionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +72,7 @@ /** * @param + * * @author Divine Threepwood */ // @@ -142,7 +141,7 @@ public AbstractRemoteClient(final Class dataClass) { this.middlewareFailureObserver = (source, watchDogState) -> { switch (watchDogState) { case FAILED: - logger.warn("Broker at "+JPService.getValue(JPComHost.class, "?")+ " not responding."); + logger.warn("Broker at " + JPService.getValue(JPComHost.class, "?") + " not responding."); AbstractRemoteClient.this.setConnectionState(DISCONNECTED); break; } @@ -170,6 +169,7 @@ protected void setMessageProcessor(MessageProcessor messageProcessor * {@inheritDoc} * * @param scope {@inheritDoc} + * * @throws org.openbase.jul.exception.InitializationException {@inheritDoc} * @throws java.lang.InterruptedException {@inheritDoc} */ @@ -182,6 +182,7 @@ public void init(final Scope scope) throws InitializationException, InterruptedE * Initialize the remote on a scope. * * @param scope the scope where the remote communicates + * * @throws InitializationException if the initialization fails * @throws InterruptedException if the initialization is interrupted */ @@ -210,6 +211,7 @@ protected void postInit() throws InitializationException, InterruptedException { * * @param scope {@inheritDoc} * @param communicatorConfig {@inheritDoc} + * * @throws org.openbase.jul.exception.InitializationException {@inheritDoc} * @throws java.lang.InterruptedException {@inheritDoc} */ @@ -333,6 +335,7 @@ public void lock(final Object maintainer) throws CouldNotPerformException { * Method unlocks this instance. * * @param maintainer the instance which currently holds the lock. + * * @throws CouldNotPerformException is thrown if the instance could not be * unlocked. */ @@ -361,6 +364,7 @@ public Class getDataClass() { * * @param handler * @param wait + * * @throws InterruptedException * @throws CouldNotPerformException */ @@ -374,7 +378,7 @@ protected Function2, Unit> generateHandler() { logger.debug("Internal notification: " + event.toString()); applyEventUpdate(event, userProperties); } catch (Exception ex) { - if(!ExceptionProcessor.isCausedBySystemShutdown(ex)) { + if (!ExceptionProcessor.isCausedBySystemShutdown(ex)) { ExceptionPrinter.printHistory(new CouldNotPerformException("Internal notification failed!", ex), logger); } } @@ -448,6 +452,7 @@ public void activate(final Object maintainer) throws InterruptedException, Could * Atomic deactivate which makes sure that the maintainer stays the same. * * @param maintainer the current maintainer of this remote + * * @throws InterruptedException if deactivation is interrupted * @throws CouldNotPerformException if deactivation fails * @throws VerificationFailedException is thrown if the given maintainer does not match the current one @@ -545,6 +550,7 @@ protected void reinit() throws InterruptedException, CouldNotPerformException { * it is necessary to call {@code requestData.get()}. * * @param scope the new scope to configure. + * * @throws InterruptedException is thrown if the current thread was externally interrupted. * @throws CouldNotPerformException is throws if the reinit has been failed. */ @@ -597,6 +603,7 @@ protected void reinit(final Scope scope) throws InterruptedException, CouldNotPe * it is necessary to call {@code requestData.get()}. * * @param maintainer the current maintainer of this remote + * * @throws InterruptedException is thrown if the current thread was externally interrupted. * @throws CouldNotPerformException is throws if the reinit has been failed. * @throws VerificationFailedException is thrown if the given maintainerLock does not match the current maintainer @@ -614,6 +621,7 @@ public void reinit(final Object maintainer) throws InterruptedException, CouldNo * * @param scope the new scope to configure. * @param maintainer the current maintainer of this remote + * * @throws InterruptedException is thrown if the current thread was externally interrupted. * @throws CouldNotPerformException is throws if the reinit has been failed. * @throws VerificationFailedException is thrown if the given maintainerLock does not match the current maintainer @@ -794,7 +802,7 @@ public R callMethod(final String methodName, final Class R callMethod(final String methodName, final Class R callMethod(final String methodName, final Class R callMethod(final String methodName, final Class 15000) { ExceptionPrinter.printHistory(ex, logger, LogLevel.WARN); - logger.warn("Waiting for RPCServer[" + rpcClient.getScope() + "] to call method [" + methodName + "(" + shortArgument + ")]. Next retry timeout in " + (int) (Math.floor(retryTimeout / 1000)) + " sec."); + logger.warn("Waiting for RPCServer[" + ScopeProcessor.generateStringRep(rpcClient.getScope()) + "] to call method [" + methodName + "(" + shortArgument + ")]. Next retry timeout in " + (int) (Math.floor(retryTimeout / 1000)) + " sec."); } else { ExceptionPrinter.printHistory(ex, logger, LogLevel.DEBUG); - logger.debug("Waiting for RPCServer[" + rpcClient.getScope() + "] to call method [" + methodName + "(" + shortArgument + ")]. Next retry timeout in " + (int) (Math.floor(retryTimeout / 1000)) + " sec."); + logger.debug("Waiting for RPCServer[" + ScopeProcessor.generateStringRep(rpcClient.getScope()) + "] to call method [" + methodName + "(" + shortArgument + ")]. Next retry timeout in " + (int) (Math.floor(retryTimeout / 1000)) + " sec."); } Thread.yield(); @@ -856,7 +864,7 @@ public R callMethod(final String methodName, final Class R callMethod(final String methodName, final Class {@inheritDoc} * @param methodName {@inheritDoc} * @param argument {@inheritDoc} + * * @return {@inheritDoc} */ @Override - public Future> callMethodAsync(final String methodName, final Class returnClazz, final T argument) { + public Future> callMethodAsync(final String methodName, final Class returnClazz, final T argument) { //todo: refactor this section by implementing a PreFutureHandler, so a future object can directly be returned. // Both, the waitForMiddleware and the method call future should be encapsulated in the PreFutureHandler @@ -885,7 +894,7 @@ public Future> callMethodAsync(final String return (Future>) FutureProcessor.canceledFuture(ex); } - return GlobalCachedExecutorService.submit(new Callable >() { + return GlobalCachedExecutorService.submit(new Callable>() { private Future> internalCallFuture; @@ -896,7 +905,7 @@ public RPCResponse call() throws Exception { try { try { - logger.debug("Calling method async [" + methodName + "(" + shortArgument + ")] on scope: " + rpcClient.getScope().toString()); + logger.debug("Calling method async [" + methodName + "(" + shortArgument + ")] on scope: " + ScopeProcessor.generateStringRep(rpcClient.getScope())); if (!isConnected()) { try { @@ -930,7 +939,8 @@ public RPCResponse call() throws Exception { } else { waitForConnectionState(CONNECTING); } - } catch (ExecutionException | java.util.concurrent.TimeoutException | CancellationException exx) { + } catch (ExecutionException | java.util.concurrent.TimeoutException | + CancellationException exx) { // cancel call if connection is broken if (internalCallFuture != null) { internalCallFuture.cancel(true); @@ -950,7 +960,7 @@ public RPCResponse call() throws Exception { } throw ex; } catch (final InvalidStateException ex) { - // reinit remote service because middleware connection lost! + // re-init remote service because middleware connection lost! switch (connectionState) { // only if the connection was established before and no reconnect is ongoing. case CONNECTING: @@ -964,7 +974,7 @@ public RPCResponse call() throws Exception { throw ex; } } catch (final CouldNotPerformException | CancellationException | InterruptedException ex) { - throw new CouldNotPerformException("Could not call remote Method[" + methodName + "(" + shortArgument + ")] on Scope[" + rpcClient.getScope() + "].", ex); + throw new CouldNotPerformException("Could not call remote Method[" + methodName + "(" + shortArgument + ")] on Scope[" + ScopeProcessor.generateStringRep(rpcClient.getScope()) + "].", ex); } } }); @@ -988,7 +998,7 @@ public Future requestData() { } // Create new sync process - if(syncFuture == null || syncFuture.isDone()) { + if (syncFuture == null || syncFuture.isDone()) { syncFuture = new CompletableFutureLite<>(); } @@ -1008,7 +1018,7 @@ public Future requestData() { * @return fresh synchronized data object. */ private Future sync() { - logger.debug("Synchronization of Remote[" + this + "] triggered..."); + logger.trace("Synchronization of Remote[" + this + "] triggered..."); try { validateInitialization(); try { @@ -1086,7 +1096,7 @@ private M applyEventUpdate(final Event event, final Map userProp } try { - if(!validateAndUpdateEventTimestamp(userProperties)) { + if (!validateAndUpdateEventTimestamp(userProperties)) { logger.debug("Skip event on scope[" + getScopeStringRep() + "] because event seems to be outdated!"); return data; } @@ -1130,7 +1140,7 @@ public boolean validateAndUpdateEventTimestamp(final Map userPro } catch (NumberFormatException ex) { final String timeMs = userProperties.get(CommunicatorImpl.TIMESTAMP_KEY_MS); final String timeNano = userProperties.get(CommunicatorImpl.TIMESTAMP_KEY_NANO); - throw new CouldNotPerformException("One of the timestamps milliseconds["+timeMs+"] or nanoseconds["+timeNano+"] cannot be interpreted as a number", ex); + throw new CouldNotPerformException("One of the timestamps milliseconds[" + timeMs + "] or nanoseconds[" + timeNano + "] cannot be interpreted as a number", ex); } } @@ -1161,6 +1171,7 @@ public void shutdown() { * {@inheritDoc} * * @return {@inheritDoc} + * * @throws NotAvailableException {@inheritDoc} */ @Override @@ -1203,6 +1214,7 @@ protected void setData(final M data) { * transaction sync futures return. * * @param data the data type notified. + * * @throws CouldNotPerformException if notification fails or no transaction id could be extracted. */ private void notifyPrioritizedObservers(final M data) throws CouldNotPerformException { @@ -1273,6 +1285,7 @@ public void waitForData() throws CouldNotPerformException, InterruptedException * * @param timeout {@inheritDoc} * @param timeUnit {@inheritDoc} + * * @throws CouldNotPerformException {@inheritDoc} * @throws java.lang.InterruptedException {@inheritDoc} */ @@ -1295,7 +1308,8 @@ public void waitForData(long timeout, TimeUnit timeUnit) throws CouldNotPerformE // wait for data sync dataObservable.waitForValue(timeoutSplitter.getTime(), timeoutSplitter.getTimeUnit()); - } catch (java.util.concurrent.TimeoutException | CouldNotPerformException | ExecutionException | CancellationException ex) { + } catch (java.util.concurrent.TimeoutException | CouldNotPerformException | ExecutionException | + CancellationException ex) { if (shutdownInitiated) { throw new ShutdownInProgressException(this); } @@ -1416,6 +1430,7 @@ public void waitForMiddleware(final long timeout, final TimeUnit timeUnit) throw * @param connectionState the desired connection state * @param timeout the timeout in milliseconds until the method throw a * TimeoutException in case the connection state was not reached. + * * @throws InterruptedException is thrown in case the thread is externally * interrupted. * @throws org.openbase.jul.exception.TimeoutException is thrown in case the @@ -1478,6 +1493,7 @@ private String getScopeStringRep() { * Method blocks until the remote reaches the desired connection state. * * @param connectionState the desired connection state + * * @throws InterruptedException is thrown in case the thread is externally * interrupted. * @throws org.openbase.jul.exception.CouldNotPerformException is thrown in case the @@ -1495,6 +1511,7 @@ public void waitForConnectionState(final ConnectionState.State connectionState) * {@inheritDoc} * * @return {@inheritDoc} + * * @throws NotAvailableException {@inheritDoc} */ @Override @@ -1730,6 +1747,7 @@ public String toString() { * Get the latest transaction id. It will be updated every time after prioritized observers are notified. * * @return the latest transaction id. + * * @throws NotAvailableException if no data has been received yet */ @Override @@ -1773,14 +1791,12 @@ private boolean isRelatedFutureCancelled() { public M call() throws CouldNotPerformException { Future internalFuture = null; - Event event; M receivedData; boolean active = isActive(); ExecutionException lastException = null; try { try { - logger.debug("Request controller synchronization."); - + logger.trace("Request controller synchronization."); long timeout = METHOD_CALL_START_TIMEOUT; while (true) { diff --git a/module/communication/default/src/main/java/org/openbase/jul/communication/exception/RPCResolvedException.java b/module/communication/default/src/main/java/org/openbase/jul/communication/exception/RPCResolvedException.java index 445060bda..520acc530 100644 --- a/module/communication/default/src/main/java/org/openbase/jul/communication/exception/RPCResolvedException.java +++ b/module/communication/default/src/main/java/org/openbase/jul/communication/exception/RPCResolvedException.java @@ -75,10 +75,11 @@ public RPCResolvedException(final RPCException rpcException) { * Method parses the RPCException message and resolves the causes and messagen and use those to reconstruct the exception chain. * * @param rpcException the origin RPCException + * * @return the reconstruced excetion cause chain. */ - public static Exception resolveRPCException(final RPCException rpcException) { - Exception exception = null; + public static Throwable resolveRPCException(final RPCException rpcException) { + Throwable exception = null; // build stacktrace array where each line is stored as entry. entry is extract each line stacktrace into arr final String[] stacktrace = ("Caused by: " + rpcException.getMessage()).split("\n"); @@ -97,15 +98,16 @@ public static Exception resolveRPCException(final RPCException rpcException) { final String message = causes.length <= 2 ? "" : stacktrace[i].substring(stacktrace[i].lastIndexOf(exceptionClassName) + exceptionClassName.length() + 2).trim(); // detect exception class - final Class exceptionClass; + final Class exceptionClass; try { - exceptionClass = (Class) Class.forName(exceptionClassName); + exceptionClass = (Class) Class.forName(exceptionClassName); // build exception try { // try default constructor exception = exceptionClass.getConstructor(String.class, Throwable.class).newInstance(message, exception); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassCastException | UnsupportedOperationException ex) { + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | + NoSuchMethodException | ClassCastException | UnsupportedOperationException ex) { try { // try to handle missing fields if (exception == null && message.isEmpty()) { @@ -117,7 +119,8 @@ public static Exception resolveRPCException(final RPCException rpcException) { } else { throw ex; } - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException exx) { + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | + NoSuchMethodException exx) { throw new CouldNotPerformException("No compatible constructor found!", exx); } } diff --git a/module/communication/mqtt/src/main/java/org/openbase/jul/communication/mqtt/RPCServerImpl.kt b/module/communication/mqtt/src/main/java/org/openbase/jul/communication/mqtt/RPCServerImpl.kt index 5405abe1e..687f089f9 100644 --- a/module/communication/mqtt/src/main/java/org/openbase/jul/communication/mqtt/RPCServerImpl.kt +++ b/module/communication/mqtt/src/main/java/org/openbase/jul/communication/mqtt/RPCServerImpl.kt @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory import java.lang.reflect.InvocationTargetException import java.time.Duration import java.util.* +import java.util.concurrent.ExecutionException import java.util.concurrent.Future import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -158,19 +159,20 @@ class RPCServerImpl( val result = method.invoke(request.paramsList) responseBuilder.result = result } catch (ex: Exception) { - when (ex) { - is InvocationTargetException -> { + val targetException = when (ex) { + is InvocationTargetException, is ExecutionException -> { if (JPService.verboseMode()) { ExceptionPrinter.printHistory(ex, logger, LogLevel.WARN) } - responseBuilder.error = ex.cause?.stackTraceToString() ?: ex.stackTraceToString() + ex.cause ?: ex } else -> { ExceptionPrinter.printHistory(ex, logger, LogLevel.WARN) - responseBuilder.error = CouldNotPerformException("Server error ${ex.message}").stackTraceToString() + CouldNotPerformException("Server error ${ex.message}", ex) } } + responseBuilder.error = targetException.stackTraceToString() } mqttClient.publish( diff --git a/module/extension/type/processing/src/main/java/org/openbase/jul/extension/type/processing/ScopeProcessor.java b/module/extension/type/processing/src/main/java/org/openbase/jul/extension/type/processing/ScopeProcessor.java index 680041b19..44101d44e 100644 --- a/module/extension/type/processing/src/main/java/org/openbase/jul/extension/type/processing/ScopeProcessor.java +++ b/module/extension/type/processing/src/main/java/org/openbase/jul/extension/type/processing/ScopeProcessor.java @@ -10,12 +10,12 @@ * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Lesser Public License for more details. - * + * * You should have received a copy of the GNU General Lesser Public * License along with this program. If not, see * . @@ -34,6 +34,14 @@ public class ScopeProcessor { public static final String COMPONENT_SEPARATOR = "/"; + public static String generateStringRep(final ScopeType.Scope scope, final String alternative) { + try { + return generateStringRep(scope); + } catch (CouldNotPerformException ex) { + return alternative; + } + } + public static String generateStringRep(final ScopeType.Scope scope) throws CouldNotPerformException { try { if (scope == null) { @@ -51,7 +59,7 @@ public static String generateStringRep(final Collection components) thro for (String component : components) { // merge to components in case they are connected by an empty one - if(component.isEmpty()) { + if (component.isEmpty()) { continue; } diff --git a/module/extension/type/util/src/main/java/org/openbase/jul/extension/type/util/TransactionSynchronizationFuture.java b/module/extension/type/util/src/main/java/org/openbase/jul/extension/type/util/TransactionSynchronizationFuture.java index ce132361a..e2d1fb196 100644 --- a/module/extension/type/util/src/main/java/org/openbase/jul/extension/type/util/TransactionSynchronizationFuture.java +++ b/module/extension/type/util/src/main/java/org/openbase/jul/extension/type/util/TransactionSynchronizationFuture.java @@ -10,12 +10,12 @@ * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Lesser Public License for more details. - * + * * You should have received a copy of the GNU General Lesser Public * License along with this program. If not, see * . @@ -107,7 +107,7 @@ protected boolean check(T message) throws CouldNotPerformException { final boolean result = dataProvider.getTransactionId() >= transactionId; if (!result) { - logger.debug("Transition check failed, received {} but waiting for {} of {}", dataProvider.getTransactionId(), transactionId, dataProvider); + logger.trace("Outdated transition {} received, waiting for {} of {}", dataProvider.getTransactionId(), transactionId, dataProvider); } return result; diff --git a/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractRegistry.java b/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractRegistry.java index e783de3f7..672deb31b 100644 --- a/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractRegistry.java +++ b/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractRegistry.java @@ -10,12 +10,12 @@ * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Lesser Public License for more details. - * + * * You should have received a copy of the GNU General Lesser Public * License along with this program. If not, see * . @@ -28,8 +28,8 @@ import org.openbase.jps.preset.JPForce; import org.openbase.jps.preset.JPTestMode; import org.openbase.jps.preset.JPVerbose; -import org.openbase.jul.exception.*; import org.openbase.jul.exception.InstantiationException; +import org.openbase.jul.exception.*; import org.openbase.jul.exception.MultiException.ExceptionStack; import org.openbase.jul.exception.printer.ExceptionPrinter; import org.openbase.jul.exception.printer.LogLevel; @@ -51,6 +51,7 @@ import java.util.*; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * @param EntryKey @@ -194,8 +195,8 @@ public ENTRY register(final ENTRY entry) throws CouldNotPerformException { sandbox.register(entry); pluginPool.beforeRegister(entry); entryMap.put(entry.getId(), entry); - finishTransaction(); pluginPool.afterRegister(entry); + finishTransaction(); } finally { syncSandbox(); } @@ -326,10 +327,6 @@ public ENTRY superRemove(final ENTRY entry) throws CouldNotPerformException { lock(); try { try { - // validate removal - if (!entryMap.containsKey(entry.getId())) { - throw new InvalidStateException("Entry not registered!"); - } // perform removal pluginPool.beforeRemove(entry); sandbox.remove(entry); @@ -367,7 +364,13 @@ public ENTRY superRemove(final ENTRY entry) throws CouldNotPerformException { public List removeAll(final Collection entries) throws MultiException, InvalidStateException { final List removedEntries = new ArrayList<>(); ExceptionStack exceptionStack = null; - registryLock.writeLock().lock(); + try { + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + try { for (final ENTRY entry : entries) { @@ -405,7 +408,12 @@ public List removeAll(final Collection entries) throws MultiExcept public List removeAllByKey(final Collection keys) throws MultiException, InvalidStateException { final List removedEntries = new ArrayList<>(); ExceptionStack exceptionStack = null; - registryLock.writeLock().lock(); + try { + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { for (final KEY key : keys) { @@ -445,7 +453,12 @@ public ENTRY get(final KEY key) throws CouldNotPerformException { throw new NotAvailableException("key"); } verifyID(key); - registryLock.readLock().lock(); + try { + registryLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { if (!entryMap.containsKey(key)) { @@ -487,7 +500,12 @@ public ENTRY get(final KEY key) throws CouldNotPerformException { */ @Override public List getEntries() { - registryLock.readLock().lock(); + try { + registryLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { return new ArrayList<>(entryMap.values()); } finally { @@ -497,7 +515,12 @@ public List getEntries() { @Override public Map getEntryMap() { - registryLock.readLock().lock(); + try { + registryLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { return Collections.unmodifiableMap(entryMap); } finally { @@ -512,7 +535,12 @@ public Map getEntryMap() { */ @Override public int size() { - registryLock.readLock().lock(); + try { + registryLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { return entryMap.size(); } finally { @@ -527,7 +555,12 @@ public int size() { */ @Override public boolean isEmpty() { - registryLock.readLock().lock(); + try { + registryLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { return entryMap.isEmpty(); } finally { @@ -650,14 +683,19 @@ public void replaceInternalMap(final Map map, boolean finishTransact */ @Override public void checkWriteAccess() throws RejectedException { - logger.debug("checkWriteAccess of " + this); + logger.trace("checkWriteAccess of " + this); if (isShutdownInitiated()) { throw new RejectedException("Write access rejected because of registry shutdown!", new ShutdownInProgressException(this)); } - if (!isDependingOnConsistentRegistries()) { - throw new RejectedException("At least one depending registry is inconsistent!"); + + List inconsistentDependencies = getInconsistentDependingRegistries(); + if (!inconsistentDependencies.isEmpty()) { + throw new RejectedException("Depending registries [" + inconsistentDependencies + .stream() + .map(Registry::getName) + .toList() + "] is inconsistent!"); } pluginPool.checkAccess(); @@ -673,15 +711,19 @@ public void checkWriteAccess() throws RejectedException { * * @return The method returns should return false if at least one depending registry is not consistent! */ - protected boolean isDependingOnConsistentRegistries() { - dependingRegistryMapLock.readLock().lock(); + protected List getInconsistentDependingRegistries() { try { - for (Registry registry : dependingRegistryMap.keySet()) { - if (!registry.isConsistent()) { - return false; - } - } - return true; + dependingRegistryMapLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + + try { + return dependingRegistryMap.keySet() + .stream() + .filter((registry) -> (!registry.isConsistent())) + .collect(Collectors.toList()); } finally { dependingRegistryMapLock.readLock().unlock(); } @@ -702,9 +744,21 @@ public void registerDependency(final Registry registry) throws CouldNotPerformEx if (registry == null) { throw new NotAvailableException("registry"); } - registryLock.writeLock().lock(); + + try { + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + try { - dependingRegistryMapLock.writeLock().lock(); + try { + dependingRegistryMapLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { // check if already registered if (dependingRegistryMap.containsKey(registry)) { @@ -747,7 +801,14 @@ public void removeDependency(final Registry registry) throws CouldNotPerformExce if (registry == null) { throw new NotAvailableException("registry"); } - registryLock.writeLock().lock(); + + try { + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + try { dependingRegistryMapLock.writeLock().lock(); try { @@ -768,9 +829,21 @@ public void removeDependency(final Registry registry) throws CouldNotPerformExce * Removal of all registered registry dependencies in the reversed order in which they where added. */ public void removeAllDependencies() { - registryLock.writeLock().lock(); try { - dependingRegistryMapLock.writeLock().lock(); + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + + try { + try { + dependingRegistryMapLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + try { List dependingRegistryList = new ArrayList<>(dependingRegistryMap.keySet()); Collections.reverse(dependingRegistryList); @@ -826,7 +899,7 @@ protected final boolean notifyObservers() { } notificationSkipped = false; } catch (CouldNotPerformException ex) { - if(!ExceptionProcessor.isCausedByInterruption(ex) && !ExceptionProcessor.isCausedBySystemShutdown(ex)) { + if (!ExceptionProcessor.isCausedByInterruption(ex) && !ExceptionProcessor.isCausedBySystemShutdown(ex)) { ExceptionPrinter.printHistory(new CouldNotPerformException("Could not notify all registry observer!", ex), logger, LogLevel.ERROR); } return false; @@ -904,17 +977,21 @@ public final int checkConsistency() throws CouldNotPerformException { int modificationCounter = 0; if (consistencyHandlerList.isEmpty()) { - logger.debug("Skip consistency check because no handler are registered."); + logger.trace("Skip consistency check because no handler are registered."); return modificationCounter; } if (isEmpty()) { - logger.debug("Skip consistency check because " + getName() + " is empty."); + logger.trace("Skip consistency check because " + getName() + " is empty."); return modificationCounter; } - if (!isDependingOnConsistentRegistries()) { - logger.warn("Skip consistency check because " + getName() + " is depending on at least one inconsistent registry!"); + List inconsistentDependencies = getInconsistentDependingRegistries(); + if (!inconsistentDependencies.isEmpty()) { + logger.warn("Skip consistency check because " + getName() + " is depending on the following inconsistent registries [" + inconsistentDependencies + .stream() + .map(Registry::getName) + .toList() + "]!"); return modificationCounter; } @@ -927,7 +1004,12 @@ public final int checkConsistency() throws CouldNotPerformException { lock(); try { pluginPool.beforeConsistencyCheck(); - consistencyCheckLock.writeLock().lock(); + try { + consistencyCheckLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { try { try { @@ -1125,10 +1207,12 @@ protected void finishTransaction() throws CouldNotPerformException { } private void syncSandbox() { -// if (consistencyCheckLock.isWriteLocked()) { -// throw new FatalImplementationErrorException("Sync sandbox registry during consistency check[" + consistencyCheckLock.writeLock().isHeldByCurrentThread() + "]", this); -// } - registryLock.readLock().lock(); + try { + registryLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { sandbox.sync(entryMap); } finally { @@ -1163,7 +1247,12 @@ public boolean isReady() { public void shutdown() { shutdownInitiated = true; try { - registryLock.writeLock().lock(); + try { + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { super.shutdown(); @@ -1350,7 +1439,12 @@ private void unlockRegistries(final Set lockedRegistries) { * thread could already start locking while not everything is unlocked. * But by locking this registry additionally the next thread can only try to lock if all registries * this one depends on have been unlocked. */ - registryLock.writeLock().lock(); + try { + registryLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } try { for (Registry registry : lockedRegistries) { if (registry instanceof RemoteRegistry) { @@ -1425,7 +1519,13 @@ public boolean recursiveTryLockRegistry(final Set lockedRegistries) th // the lock has been acquired so add this to the set lockedRegistries.add(this); - dependingRegistryMapLock.readLock().lock(); + try { + dependingRegistryMapLock.readLock().lockInterruptibly(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted!", ex); + } + try { // iterate over all registries this one depends on and try to lock recursively for (Registry registry : dependingRegistryMap.keySet()) { @@ -1447,9 +1547,6 @@ public boolean recursiveTryLockRegistry(final Set lockedRegistries) th // all registries this one depends on could be locked recursively as well, so return true return true; } else { -// if (!isSandbox()) { -// logger.info("Failed to lock " + this); -// } // acquiring the write lock for this registry failed, so return false return false; } diff --git a/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractSynchronizer.java b/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractSynchronizer.java index 73aaf1b62..cba19ecf4 100644 --- a/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractSynchronizer.java +++ b/module/storage/src/main/java/org/openbase/jul/storage/registry/AbstractSynchronizer.java @@ -10,12 +10,12 @@ * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Lesser Public License for more details. - * + * * You should have received a copy of the GNU General Lesser Public * License along with this program. If not, see * . @@ -91,7 +91,6 @@ public void relay() throws Exception { } }; this.observer = (Object source, Object data) -> { - logger.debug("Incoming update..."); GlobalCachedExecutorService.submit(() -> { try { recurrenceSyncFilter.trigger();