diff --git a/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java b/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java index 6d1a087d99d..4de1c236cbe 100644 --- a/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java +++ b/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java @@ -1063,7 +1063,7 @@ public void prepareExecution() throws HopException { // Just for safety, fire the pipeline finished listeners... try { - firePipelineExecutionFinishedListeners(); + fireExecutionFinishedListeners(); } catch (HopException e) { // listeners produces errors log.logError(BaseMessages.getString(PKG, "Pipeline.FinishListeners.Exception")); @@ -1109,7 +1109,7 @@ public void startThreads() throws HopException { ExtensionPointHandler.callExtensionPoint( log, this, HopExtensionPoint.PipelineStartThreads.id, this); - firePipelineExecutionStartedListeners(); + fireExecutionStartedListeners(); for (int i = 0; i < transforms.size(); i++) { final TransformMetaDataCombi sid = transforms.get(i); @@ -1137,7 +1137,7 @@ public void startThreads() throws HopException { executionEndDate = new Date(); try { - firePipelineExecutionFinishedListeners(); + fireExecutionFinishedListeners(); } catch (Exception e) { transform.setErrors(transform.getErrors() + 1L); log.logError( @@ -1293,7 +1293,7 @@ public void run() { // So we fire the execution finished listeners here. // if (transforms.isEmpty()) { - firePipelineExecutionFinishedListeners(); + fireExecutionFinishedListeners(); } if (log.isDetailed()) { @@ -1312,16 +1312,23 @@ public void run() { * @throws HopException if any errors occur during notification */ @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionFinishedListeners() throws HopException { + fireExecutionFinishedListeners(); + } + + @Override + public void fireExecutionFinishedListeners() throws HopException { synchronized (executionFinishedListeners) { if (executionFinishedListeners.size() == 0) { return; } // prevent Exception from one listener to block others execution List badGuys = new ArrayList<>(executionFinishedListeners.size()); - for (IExecutionFinishedListener executionListener : executionFinishedListeners) { + for (IExecutionFinishedListener> listener : + executionFinishedListeners) { try { - executionListener.finished(this); + listener.finished(this); } catch (HopException e) { badGuys.add(e); } @@ -1357,10 +1364,17 @@ public void pipelineCompleted() throws HopException { * @throws HopException if any errors occur during notification */ @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionStartedListeners() throws HopException { + fireExecutionStartedListeners(); + } + + @Override + public void fireExecutionStartedListeners() throws HopException { synchronized (executionStartedListeners) { - for (IExecutionStartedListener executionListener : executionStartedListeners) { - executionListener.started(this); + for (IExecutionStartedListener> listener : + executionStartedListeners) { + listener.started(this); } } } @@ -1595,7 +1609,7 @@ public void safeStop() { } transforms.stream().filter(this::isInputTransform).forEach(combi -> stopTransform(combi, true)); - firePipelineExecutionStoppedListeners(); + fireExecutionStoppedListeners(); } private boolean isInputTransform(TransformMetaDataCombi combi) { @@ -1617,7 +1631,7 @@ public void stopAll() { setStopped(true); isAlreadyStopped.set(true); - firePipelineExecutionStoppedListeners(); + fireExecutionStoppedListeners(); } public void stopTransform(TransformMetaDataCombi combi, boolean safeStop) { @@ -1638,11 +1652,16 @@ public void stopTransform(TransformMetaDataCombi combi, boolean safeStop) { } @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionStoppedListeners() { - // Fire the stopped listener... - // + fireExecutionStoppedListeners(); + } + + @Override + public void fireExecutionStoppedListeners() { synchronized (executionStoppedListeners) { - for (IExecutionStoppedListener listener : executionStoppedListeners) { + for (IExecutionStoppedListener> listener : + executionStoppedListeners) { listener.stopped(this); } } @@ -2483,39 +2502,51 @@ public void setTransformPerformanceSnapShots( this.transformPerformanceSnapShots = transformPerformanceSnapShots; } - /** - * Adds a pipeline started listener. - * - * @param executionStartedListener the pipeline started listener - */ @Override - public void addExecutionStartedListener(IExecutionStartedListener executionStartedListener) { - synchronized (executionStartedListener) { - executionStartedListeners.add(executionStartedListener); + public void addExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.add(listener); } } - /** - * Adds a pipeline finished listener. - * - * @param executionFinishedListener the pipeline finished listener - */ @Override - public void addExecutionFinishedListener(IExecutionFinishedListener executionFinishedListener) { - synchronized (executionFinishedListener) { - executionFinishedListeners.add(executionFinishedListener); + public void removeExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.remove(listener); + } + } + + @Override + public void addExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.add(listener); + } + } + + @Override + public void removeExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.remove(listener); } } - /** - * Adds a pipeline stopped listener. - * - * @param executionStoppedListener the pipeline stopped listener - */ @Override - public void addExecutionStoppedListener(IExecutionStoppedListener executionStoppedListener) { - synchronized (executionStoppedListener) { - executionStoppedListeners.add(executionStoppedListener); + public void addExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.add(listener); + } + } + + @Override + public void removeExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.remove(listener); } } @@ -2524,6 +2555,7 @@ public void addExecutionStoppedListener(IExecutionStoppedListener executionStopp * * @param executionStoppedListeners the list of stop-event listeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionStoppedListeners( List>> executionStoppedListeners) { this.executionStoppedListeners = Collections.synchronizedList(executionStoppedListeners); @@ -2535,6 +2567,7 @@ public void setExecutionStoppedListeners( * * @return the list of stop-event listeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionStoppedListeners() { return executionStoppedListeners; @@ -3543,6 +3576,7 @@ public void setFeedbackSize(int feedbackSize) { * * @return value of executionStartedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionStartedListeners() { return executionStartedListeners; @@ -3551,6 +3585,7 @@ public void setFeedbackSize(int feedbackSize) { /** * @param executionStartedListeners The executionStartedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionStartedListeners( List>> executionStartedListeners) { this.executionStartedListeners = executionStartedListeners; @@ -3561,6 +3596,7 @@ public void setExecutionStartedListeners( * * @return value of executionFinishedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionFinishedListeners() { return executionFinishedListeners; @@ -3569,6 +3605,7 @@ public void setExecutionStartedListeners( /** * @param executionFinishedListeners The executionFinishedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionFinishedListeners( List>> executionFinishedListeners) { this.executionFinishedListeners = executionFinishedListeners; diff --git a/engine/src/main/java/org/apache/hop/pipeline/engine/IFinishedListener.java b/engine/src/main/java/org/apache/hop/pipeline/engine/IFinishedListener.java deleted file mode 100644 index bdc710ce170..00000000000 --- a/engine/src/main/java/org/apache/hop/pipeline/engine/IFinishedListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hop.pipeline.engine; - -import org.apache.hop.core.exception.HopException; -import org.apache.hop.pipeline.PipelineMeta; - -public interface IFinishedListener { - - /** - * When all processing has completed for the engine. - * - * @param pipelineEngine - * @throws HopException - */ - void finished(IPipelineEngine pipelineEngine) throws HopException; -} diff --git a/engine/src/main/java/org/apache/hop/pipeline/engine/IPipelineEngine.java b/engine/src/main/java/org/apache/hop/pipeline/engine/IPipelineEngine.java index 57ffdf86605..dd288610318 100644 --- a/engine/src/main/java/org/apache/hop/pipeline/engine/IPipelineEngine.java +++ b/engine/src/main/java/org/apache/hop/pipeline/engine/IPipelineEngine.java @@ -195,38 +195,92 @@ public interface IPipelineEngine boolean isPaused(); /** - * Call the given listener lambda when this pipeline engine has started execution. + * Attach a listener to notify when the pipeline has started execution. * - * @param listener - * @throws HopException + * @param listener the pipeline started listener */ - void addExecutionStartedListener(IExecutionStartedListener> listener) - throws HopException; + void addExecutionStartedListener(IExecutionStartedListener> listener); - void firePipelineExecutionStartedListeners() throws HopException; + /** + * Detach a listener to notify when the pipeline has started execution. + * + * @param listener the pipeline started listener + */ + void removeExecutionStartedListener(IExecutionStartedListener> listener); /** - * Call the given listener lambda when this pipeline engine has completed execution. + * Use the {@link #fireExecutionStartedListeners} method. * - * @param listener * @throws HopException */ - void addExecutionFinishedListener(IExecutionFinishedListener> listener) - throws HopException; + @Deprecated(since = "2.9", forRemoval = true) + void firePipelineExecutionStartedListeners() throws HopException; - void firePipelineExecutionFinishedListeners() throws HopException; + /** + * Make attempt to fire all registered started execution listeners if possible. + * + * @throws HopException if any errors occur during notification + */ + void fireExecutionStartedListeners() throws HopException; + + /** + * Attach a listener to notify when the pipeline has completed execution. + * + * @param listener the pipeline finished listener + */ + void addExecutionFinishedListener(IExecutionFinishedListener> listener); /** - * Call the given listener lambda when this pipeline engine has stopped execution. + * Detach a listener to notify when the pipeline has completed execution. + * + * @param listener the pipeline finished listener + */ + void removeExecutionFinishedListener(IExecutionFinishedListener> listener); + + /** + * Use the {@link #fireExecutionFinishedListeners} method. * - * @param listener * @throws HopException */ - void addExecutionStoppedListener(IExecutionStoppedListener> listener) - throws HopException; + @Deprecated(since = "2.9", forRemoval = true) + void firePipelineExecutionFinishedListeners() throws HopException; + + /** + * Make attempt to fire all registered finished execution listeners if possible. + * + * @throws HopException if any errors occur during notification + */ + void fireExecutionFinishedListeners() throws HopException; + /** + * Attach a listener to notify when the pipeline has stopped execution. + * + * @param listener the pipeline stopped listener + */ + void addExecutionStoppedListener(IExecutionStoppedListener> listener); + + /** + * Detach a listener to notify when the pipeline has stopped execution. + * + * @param listener the pipeline stopped listener + */ + void removeExecutionStoppedListener(IExecutionStoppedListener> listener); + + /** + * Use the {@link #fireExecutionStoppedListeners} method. + * + * @throws HopException + */ + @Deprecated(since = "2.9", forRemoval = true) void firePipelineExecutionStoppedListeners() throws HopException; + /** + * Make attempt to fire all registered stopped execution listeners if possible. + * + * @throws HopException if any errors occur during notification + */ + void fireExecutionStoppedListeners() throws HopException; + /** * Retrieve the logging text of a particular component in the engine * @@ -301,7 +355,7 @@ void addExecutionStoppedListener(IExecutionStoppedListener> l * * @param parentPipeline */ - void setParentPipeline(IPipelineEngine parentPipeline); + void setParentPipeline(IPipelineEngine parentPipeline); /** * Inform the pipeline about a previous execution result in a workflow or pipeline diff --git a/engine/src/main/java/org/apache/hop/pipeline/engines/local/LocalPipelineEngine.java b/engine/src/main/java/org/apache/hop/pipeline/engines/local/LocalPipelineEngine.java index d78916c38be..ef3a8314029 100644 --- a/engine/src/main/java/org/apache/hop/pipeline/engines/local/LocalPipelineEngine.java +++ b/engine/src/main/java/org/apache/hop/pipeline/engines/local/LocalPipelineEngine.java @@ -157,7 +157,7 @@ public void prepareExecution() throws HopException { // We only do this when we created a new group. Never in a child // addExecutionFinishedListener( - (IExecutionFinishedListener) + (IExecutionFinishedListener>) pipeline -> { String group = (String) pipeline.getExtensionDataMap().get(Const.CONNECTION_GROUP); List databases = DatabaseConnectionMap.getInstance().getDatabases(group); diff --git a/engine/src/main/java/org/apache/hop/pipeline/engines/remote/RemotePipelineEngine.java b/engine/src/main/java/org/apache/hop/pipeline/engines/remote/RemotePipelineEngine.java index 6688cff0c1d..8e53d807220 100644 --- a/engine/src/main/java/org/apache/hop/pipeline/engines/remote/RemotePipelineEngine.java +++ b/engine/src/main/java/org/apache/hop/pipeline/engines/remote/RemotePipelineEngine.java @@ -129,7 +129,7 @@ public class RemotePipelineEngine extends Variables implements IPipelineEngine

parentPipeline; protected IWorkflowEngine parentWorkflow; protected LogLevel logLevel; protected boolean feedbackShown; // TODO factor out @@ -676,9 +676,18 @@ public void resumeExecution() { * @param executionStartedListener the pipeline started listener */ @Override - public void addExecutionStartedListener(IExecutionStartedListener executionStartedListener) { - synchronized (executionStartedListener) { - executionStartedListeners.add(executionStartedListener); + public void addExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.add(listener); + } + } + + @Override + public void removeExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.remove(listener); } } @@ -688,9 +697,18 @@ public void addExecutionStartedListener(IExecutionStartedListener executionStart * @param executionFinishedListener the pipeline finished listener */ @Override - public void addExecutionFinishedListener(IExecutionFinishedListener executionFinishedListener) { - synchronized (executionFinishedListener) { - executionFinishedListeners.add(executionFinishedListener); + public void addExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.add(listener); + } + } + + @Override + public void removeExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.remove(listener); } } @@ -959,6 +977,7 @@ public void setParentWorkflow(IWorkflowEngine parentWorkflow) { * * @return value of executionStartedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionStartedListeners() { return executionStartedListeners; @@ -967,12 +986,14 @@ public void setParentWorkflow(IWorkflowEngine parentWorkflow) { /** * @param executionStartedListeners The executionStartedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionStartedListeners( - List>> executionStartedListeners) { - this.executionStartedListeners = executionStartedListeners; + List>> listener) { + this.executionStartedListeners = listener; } - private void fireExecutionStartedListeners() throws HopException { + @Override + public void fireExecutionStartedListeners() throws HopException { synchronized (executionStartedListeners) { for (IExecutionStartedListener> listener : executionStartedListeners) { @@ -986,6 +1007,7 @@ private void fireExecutionStartedListeners() throws HopException { * * @return value of executionFinishedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionFinishedListeners() { return executionFinishedListeners; @@ -994,20 +1016,30 @@ private void fireExecutionStartedListeners() throws HopException { /** * @param executionFinishedListeners The executionFinishedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionFinishedListeners( - List>> executionFinishedListeners) { - this.executionFinishedListeners = executionFinishedListeners; + List>> listener) { + this.executionFinishedListeners = listener; } @Override public void addExecutionStoppedListener( - IExecutionStoppedListener> listener) throws HopException { + IExecutionStoppedListener> listener) { synchronized (executionStoppedListeners) { executionStoppedListeners.add(listener); } } @Override + public void removeExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.remove(listener); + } + } + + @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionStartedListeners() throws HopException { synchronized (executionStartedListeners) { for (IExecutionStartedListener> listener : @@ -1018,7 +1050,13 @@ public void firePipelineExecutionStartedListeners() throws HopException { } @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionFinishedListeners() throws HopException { + fireExecutionFinishedListeners(); + } + + @Override + public void fireExecutionFinishedListeners() throws HopException { synchronized (executionFinishedListeners) { for (IExecutionFinishedListener> listener : executionFinishedListeners) { @@ -1037,7 +1075,13 @@ public void firePipelineExecutionFinishedListeners() throws HopException { } @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionStoppedListeners() throws HopException { + fireExecutionStoppedListeners(); + } + + @Override + public void fireExecutionStoppedListeners() throws HopException { synchronized (executionStoppedListeners) { for (IExecutionStoppedListener> listener : executionStoppedListeners) { @@ -1051,6 +1095,7 @@ public void firePipelineExecutionStoppedListeners() throws HopException { * * @return value of executionStoppedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionStoppedListeners() { return executionStoppedListeners; @@ -1059,6 +1104,7 @@ public void firePipelineExecutionStoppedListeners() throws HopException { /** * @param executionStoppedListeners The executionStoppedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionStoppedListeners( List>> executionStoppedListeners) { this.executionStoppedListeners = executionStoppedListeners; diff --git a/engine/src/main/java/org/apache/hop/workflow/Workflow.java b/engine/src/main/java/org/apache/hop/workflow/Workflow.java index 2e41d1cfb44..ea6a0c82742 100644 --- a/engine/src/main/java/org/apache/hop/workflow/Workflow.java +++ b/engine/src/main/java/org/apache/hop/workflow/Workflow.java @@ -67,7 +67,9 @@ import org.apache.hop.metadata.api.IHopMetadataProvider; import org.apache.hop.pipeline.IExecutionFinishedListener; import org.apache.hop.pipeline.IExecutionStartedListener; +import org.apache.hop.pipeline.IExecutionStoppedListener; import org.apache.hop.pipeline.Pipeline; +import org.apache.hop.pipeline.PipelineMeta; import org.apache.hop.pipeline.engine.IPipelineEngine; import org.apache.hop.workflow.action.ActionMeta; import org.apache.hop.workflow.action.IAction; @@ -115,7 +117,7 @@ public abstract class Workflow extends Variables protected IWorkflowEngine parentWorkflow; /** The parent pipeline */ - protected IPipelineEngine parentPipeline; + protected IPipelineEngine parentPipeline; /** The parent logging interface to reference */ protected ILoggingObject parentLoggingObject; @@ -145,8 +147,11 @@ public abstract class Workflow extends Variables protected boolean interactive; protected List>> - workflowFinishedListeners; - protected List>> workflowStartedListeners; + executionFinishedListeners; + protected List>> + executionStartedListeners; + protected List>> + executionStoppedListeners; protected List actionListeners; @@ -193,8 +198,9 @@ enum BitMaskStatus { private void init() { status = new AtomicInteger(); - workflowStartedListeners = Collections.synchronizedList(new ArrayList<>()); - workflowFinishedListeners = Collections.synchronizedList(new ArrayList<>()); + executionStartedListeners = Collections.synchronizedList(new ArrayList<>()); + executionFinishedListeners = Collections.synchronizedList(new ArrayList<>()); + executionStoppedListeners = Collections.synchronizedList(new ArrayList<>()); actionListeners = new ArrayList<>(); // this map is being modified concurrently and must be thread-safe @@ -293,7 +299,7 @@ public Result startExecution() { // Run the workflow // - fireWorkflowStartedListeners(); + fireExecutionStartedListeners(); result = executeFromStart(); } catch (Throwable je) { @@ -314,12 +320,13 @@ public Result startExecution() { setStopped(false); } finally { try { + executionEndDate = new Date(); + ExtensionPointHandler.callExtensionPoint( log, this, HopExtensionPoint.WorkflowFinish.id, this); - executionEndDate = new Date(); - - fireWorkflowFinishListeners(); + log.logBasic(BaseMessages.getString(PKG, "Workflow.Comment.WorkflowFinished")); + fireExecutionFinishedListeners(); // release unused vfs connections HopVfs.freeUnusedResources(); @@ -464,12 +471,12 @@ private Result executeFromStart() throws HopException { } // Save this result... workflowTracker.addWorkflowTracker(new WorkflowTracker(workflowMeta, jerEnd)); - log.logBasic(BaseMessages.getString(PKG, "Workflow.Comment.WorkflowFinished")); setActive(false); if (!isStopped()) { setFinished(true); } + return res; } finally { log.snap(Metrics.METRIC_WORKFLOW_STOP); @@ -522,35 +529,105 @@ public Result executeFromStart(int nr, Result result) throws HopException { } @Override + @Deprecated(since = "2.9", forRemoval = true) + public void addWorkflowStartedListener( + IExecutionStartedListener> listener) { + addExecutionStartedListener(listener); + } + + @Override + public void addExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.add(listener); + } + } + + @Override + public void removeExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.remove(listener); + } + } + + @Override + @Deprecated(since = "2.9", forRemoval = true) + public void fireWorkflowStartedListeners() throws HopException { + fireExecutionStartedListeners(); + } + + @Override + public void fireExecutionStartedListeners() throws HopException { + synchronized (executionStartedListeners) { + for (IExecutionStartedListener> listener : + executionStartedListeners) { + listener.started(this); + } + } + } + + @Override + @Deprecated(since = "2.9", forRemoval = true) public void addWorkflowFinishedListener( - IExecutionFinishedListener> finishedListener) { - synchronized (workflowFinishedListeners) { - workflowFinishedListeners.add(finishedListener); + IExecutionFinishedListener> listener) { + addExecutionFinishedListener(listener); + } + + @Override + public void addExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.add(listener); + } + } + + @Override + public void removeExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.remove(listener); } } @Override + @Deprecated(since = "2.9", forRemoval = true) public void fireWorkflowFinishListeners() throws HopException { - synchronized (workflowFinishedListeners) { - for (IExecutionFinishedListener listener : workflowFinishedListeners) { + fireExecutionFinishedListeners(); + } + + @Override + public void fireExecutionFinishedListeners() throws HopException { + synchronized (executionFinishedListeners) { + for (IExecutionFinishedListener> listener : + executionFinishedListeners) { listener.finished(this); } } } @Override - public void addWorkflowStartedListener( - IExecutionStartedListener> finishedListener) { - synchronized (workflowStartedListeners) { - workflowStartedListeners.add(finishedListener); + public void addExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.add(listener); } } @Override - public void fireWorkflowStartedListeners() throws HopException { - synchronized (workflowStartedListeners) { - for (IExecutionStartedListener listener : workflowStartedListeners) { - listener.started(this); + public void removeExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.remove(listener); + } + } + + @Override + public void fireExecutionStoppedListeners() { + synchronized (executionStoppedListeners) { + for (IExecutionStoppedListener> listener : + executionStoppedListeners) { + listener.stopped(this); } } } @@ -968,6 +1045,9 @@ public boolean isStopped() { @Override public void stopExecution() { setStopped(true); + + log.logBasic(BaseMessages.getString(PKG, "Workflow.Log.StopWorkflowExecution")); + fireExecutionStoppedListeners(); } /** Sets the stopped. */ @@ -1543,12 +1623,12 @@ public void setForcingSeparateLogging(boolean forcingSeparateLogging) { } @Override - public IPipelineEngine getParentPipeline() { + public IPipelineEngine getParentPipeline() { return parentPipeline; } @Override - public void setParentPipeline(IPipelineEngine parentPipeline) { + public void setParentPipeline(IPipelineEngine parentPipeline) { this.parentPipeline = parentPipeline; } @@ -1605,17 +1685,19 @@ public void setExecutionEndDate(Date executionEndDate) { * @return value of workflowFinishedListeners */ @Override + @Deprecated(since = "2.9", forRemoval = true) public List>> getWorkflowFinishedListeners() { - return workflowFinishedListeners; + return executionFinishedListeners; } /** * @param workflowFinishedListeners The workflowFinishedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setWorkflowFinishedListeners( List>> workflowFinishedListeners) { - this.workflowFinishedListeners = workflowFinishedListeners; + this.executionFinishedListeners = workflowFinishedListeners; } /** @@ -1624,17 +1706,19 @@ public void setWorkflowFinishedListeners( * @return value of workflowStartedListeners */ @Override + @Deprecated(since = "2.9", forRemoval = true) public List>> getWorkflowStartedListeners() { - return workflowStartedListeners; + return executionStartedListeners; } /** * @param workflowStartedListeners The workflowStartedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setWorkflowStartedListeners( List>> workflowStartedListeners) { - this.workflowStartedListeners = workflowStartedListeners; + this.executionStartedListeners = workflowStartedListeners; } /** diff --git a/engine/src/main/java/org/apache/hop/workflow/engine/IWorkflowEngine.java b/engine/src/main/java/org/apache/hop/workflow/engine/IWorkflowEngine.java index 32858b17d67..99b71d620eb 100644 --- a/engine/src/main/java/org/apache/hop/workflow/engine/IWorkflowEngine.java +++ b/engine/src/main/java/org/apache/hop/workflow/engine/IWorkflowEngine.java @@ -34,6 +34,7 @@ import org.apache.hop.metadata.api.IHopMetadataProvider; import org.apache.hop.pipeline.IExecutionFinishedListener; import org.apache.hop.pipeline.IExecutionStartedListener; +import org.apache.hop.pipeline.IExecutionStoppedListener; import org.apache.hop.pipeline.PipelineMeta; import org.apache.hop.pipeline.engine.IPipelineEngine; import org.apache.hop.workflow.ActionResult; @@ -74,14 +75,62 @@ public interface IWorkflowEngine Date getExecutionEndDate(); - void addWorkflowStartedListener(IExecutionStartedListener> finishedListener); + /** Use the {@link #addExecutionStartedListener} method. */ + @Deprecated(since = "2.9", forRemoval = true) + void addWorkflowStartedListener(IExecutionStartedListener> listener); + @Deprecated(since = "2.9", forRemoval = true) List>> getWorkflowFinishedListeners(); - void addWorkflowFinishedListener(IExecutionFinishedListener> finishedListener); + /** Use the {@link #addExecutionFinishedListener} method. */ + @Deprecated(since = "2.9", forRemoval = true) + void addWorkflowFinishedListener(IExecutionFinishedListener> listener); + @Deprecated(since = "2.9", forRemoval = true) List>> getWorkflowStartedListeners(); + /** + * Attach a listener to notify when the workflow has started execution. + * + * @param listener the workflow started listener + */ + void addExecutionStartedListener(IExecutionStartedListener> listener); + + /** + * Detach a listener to notify when the workflow has started execution. + * + * @param listener the workflow started listener + */ + void removeExecutionStartedListener(IExecutionStartedListener> listener); + + /** + * Attach a listener to notify when the workflow has stopped execution. + * + * @param listener the workflow stopped listener + */ + void addExecutionStoppedListener(IExecutionStoppedListener> listener); + + /** + * Detach a listener to notify when the workflow has stopped execution. + * + * @param listener the workflow stopped listener + */ + void removeExecutionStoppedListener(IExecutionStoppedListener> listener); + + /** + * Attach a listener to notify when the workflow has completed execution. + * + * @param listener the workflow finished listener + */ + void addExecutionFinishedListener(IExecutionFinishedListener> listener); + + /** + * Detach a listener to notify when the workflow has completed execution. + * + * @param listener the workflow finished listener + */ + void removeExecutionFinishedListener(IExecutionFinishedListener> listener); + boolean isInteractive(); void setInteractive(boolean interactive); @@ -125,10 +174,43 @@ public interface IWorkflowEngine void setSourceRows(List sourceRows); + /** + * Use the {@link #fireExecutionFinishedListeners} method. + * + * @throws HopException + */ + @Deprecated(since = "2.9", forRemoval = true) void fireWorkflowFinishListeners() throws HopException; + /** + * Use the {@link #fireExecutionStartedListeners} method. + * + * @throws HopException + */ + @Deprecated(since = "2.9", forRemoval = true) void fireWorkflowStartedListeners() throws HopException; + /** + * Make attempt to fire all registered started execution listeners if possible. + * + * @throws HopException if any errors occur during notification + */ + void fireExecutionStartedListeners() throws HopException; + + /** + * Make attempt to fire all registered stopped execution listeners if possible. + * + * @throws HopException if any errors occur during notification + */ + void fireExecutionStoppedListeners() throws HopException; + + /** + * Make attempt to fire all registered finished execution listeners if possible. + * + * @throws HopException if any errors occur during notification + */ + void fireExecutionFinishedListeners() throws HopException; + void setContainerId(String toString); @Override diff --git a/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java b/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java index da3bc9353f2..10732ca2f58 100644 --- a/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java +++ b/engine/src/main/java/org/apache/hop/workflow/engines/local/LocalWorkflowEngine.java @@ -126,7 +126,7 @@ public Result startExecution() { // We also need to commit/rollback at the end of this workflow... // - addWorkflowFinishedListener( + addExecutionFinishedListener( workflow -> { String group = (String) workflow.getExtensionDataMap().get(Const.CONNECTION_GROUP); List databases = DatabaseConnectionMap.getInstance().getDatabases(group); @@ -234,7 +234,7 @@ public void afterExecution( // Do the lookup of the execution information only once lookupExecutionInformationLocation(); - addWorkflowStartedListener( + addExecutionStartedListener( l -> { // Register the pipeline after start // @@ -351,7 +351,7 @@ public void run() { // When the workflow is done, register one more time and stop the timer // - addWorkflowFinishedListener( + addExecutionFinishedListener( listener -> { stopExecutionInfoTimer(); }); diff --git a/engine/src/main/java/org/apache/hop/workflow/engines/remote/RemoteWorkflowEngine.java b/engine/src/main/java/org/apache/hop/workflow/engines/remote/RemoteWorkflowEngine.java index 8448aaacff7..b6147a380a9 100644 --- a/engine/src/main/java/org/apache/hop/workflow/engines/remote/RemoteWorkflowEngine.java +++ b/engine/src/main/java/org/apache/hop/workflow/engines/remote/RemoteWorkflowEngine.java @@ -52,6 +52,8 @@ import org.apache.hop.metadata.api.IHopMetadataProvider; import org.apache.hop.pipeline.IExecutionFinishedListener; import org.apache.hop.pipeline.IExecutionStartedListener; +import org.apache.hop.pipeline.IExecutionStoppedListener; +import org.apache.hop.pipeline.PipelineMeta; import org.apache.hop.pipeline.engine.IPipelineEngine; import org.apache.hop.pipeline.engines.remote.RemotePipelineEngine; import org.apache.hop.resource.ResourceUtil; @@ -121,8 +123,11 @@ public class RemoteWorkflowEngine extends Variables implements IWorkflowEngine>> - workflowFinishedListeners; - protected List>> workflowStartedListeners; + executionFinishedListeners; + protected List>> + executionStartedListeners; + protected List>> + executionStoppedListeners; protected List actionListeners; @@ -150,7 +155,7 @@ public class RemoteWorkflowEngine extends Variables implements IWorkflowEngine parentWorkflow; /** The parent pipeline */ - protected IPipelineEngine parentPipeline; + protected IPipelineEngine parentPipeline; /** The parent logging interface to reference */ private ILoggingObject parentLoggingObject; @@ -165,8 +170,9 @@ public class RemoteWorkflowEngine extends Variables implements IWorkflowEngine actionResults = new LinkedList<>(); public RemoteWorkflowEngine() { - workflowStartedListeners = Collections.synchronizedList(new ArrayList<>()); - workflowFinishedListeners = Collections.synchronizedList(new ArrayList<>()); + executionStartedListeners = Collections.synchronizedList(new ArrayList<>()); + executionFinishedListeners = Collections.synchronizedList(new ArrayList<>()); + executionStoppedListeners = Collections.synchronizedList(new ArrayList<>()); actionListeners = new ArrayList<>(); activeActions = Collections.synchronizedSet(new HashSet<>()); extensionDataMap = new HashMap<>(); @@ -267,19 +273,19 @@ public Result startExecution() { workflowExecutionConfiguration.setGatheringMetrics(gatheringMetrics); sendToHopServer(this, workflowMeta, workflowExecutionConfiguration, metadataProvider); - fireWorkflowStartedListeners(); + fireExecutionStartedListeners(); initialized = true; monitorRemoteWorkflowUntilFinished(); - fireWorkflowFinishListeners(); + fireExecutionFinishedListeners(); executionEndDate = new Date(); } catch (Exception e) { logChannel.logError("Error starting workflow", e); result.setNrErrors(result.getNrErrors() + 1); try { - fireWorkflowFinishListeners(); + fireExecutionFinishedListeners(); } catch (Exception ex) { logChannel.logError("Error executing workflow finished listeners", ex); result.setNrErrors(result.getNrErrors() + 1); @@ -336,6 +342,8 @@ public void stopExecution() { try { hopServer.stopWorkflow(this, workflowMeta.getName(), containerId); getWorkflowStatus(); + + fireExecutionStoppedListeners(); } catch (Exception e) { throw new RuntimeException( "Stopping of pipeline '" @@ -455,39 +463,123 @@ public void sendToHopServer( } @Override + @Deprecated(since = "2.9", forRemoval = true) public void addWorkflowFinishedListener( IExecutionFinishedListener> finishedListener) { - synchronized (workflowFinishedListeners) { - workflowFinishedListeners.add(finishedListener); + synchronized (executionFinishedListeners) { + executionFinishedListeners.add(finishedListener); } } @Override + @Deprecated(since = "2.9", forRemoval = true) public void fireWorkflowFinishListeners() throws HopException { - synchronized (workflowFinishedListeners) { - for (IExecutionFinishedListener listener : workflowFinishedListeners) { + synchronized (executionFinishedListeners) { + for (IExecutionFinishedListener> listener : + executionFinishedListeners) { listener.finished(this); } } } @Override + @Deprecated(since = "2.9", forRemoval = true) public void addWorkflowStartedListener( IExecutionStartedListener> finishedListener) { - synchronized (workflowStartedListeners) { - workflowStartedListeners.add(finishedListener); + synchronized (executionStartedListeners) { + executionStartedListeners.add(finishedListener); } } @Override + @Deprecated(since = "2.9", forRemoval = true) public void fireWorkflowStartedListeners() throws HopException { - synchronized (workflowStartedListeners) { - for (IExecutionStartedListener listener : workflowStartedListeners) { + synchronized (executionStartedListeners) { + for (IExecutionStartedListener> listener : + executionStartedListeners) { listener.started(this); } } } + @Override + public void addExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.add(listener); + } + } + + @Override + public void removeExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.remove(listener); + } + } + + @Override + public void fireExecutionStartedListeners() throws HopException { + synchronized (executionStartedListeners) { + for (IExecutionStartedListener> listener : + executionStartedListeners) { + listener.started(this); + } + } + } + + @Override + public void addExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.add(listener); + } + } + + @Override + public void removeExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.remove(listener); + } + } + + @Override + public void fireExecutionFinishedListeners() throws HopException { + synchronized (executionFinishedListeners) { + for (IExecutionFinishedListener> listener : + executionFinishedListeners) { + listener.finished(this); + } + } + } + + @Override + public void addExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.add(listener); + } + } + + @Override + public void removeExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.remove(listener); + } + } + + @Override + public void fireExecutionStoppedListeners() { + synchronized (executionStoppedListeners) { + for (IExecutionStoppedListener> listener : + executionStoppedListeners) { + listener.stopped(this); + } + } + } + @Override public void addActionListener(IActionListener actionListener) { actionListeners.add(actionListener); @@ -1071,17 +1163,19 @@ public void setExecutionEndDate(Date executionEndDate) { * @return value of workflowFinishedListeners */ @Override + @Deprecated(since = "2.9", forRemoval = true) public List>> getWorkflowFinishedListeners() { - return workflowFinishedListeners; + return executionFinishedListeners; } /** * @param workflowFinishedListeners The workflowFinishedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setWorkflowFinishedListeners( List>> workflowFinishedListeners) { - this.workflowFinishedListeners = workflowFinishedListeners; + this.executionFinishedListeners = workflowFinishedListeners; } /** @@ -1090,17 +1184,19 @@ public void setWorkflowFinishedListeners( * @return value of workflowStartedListeners */ @Override + @Deprecated(since = "2.9", forRemoval = true) public List>> getWorkflowStartedListeners() { - return workflowStartedListeners; + return executionStartedListeners; } /** * @param workflowStartedListeners The workflowStartedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setWorkflowStartedListeners( List>> workflowStartedListeners) { - this.workflowStartedListeners = workflowStartedListeners; + this.executionStartedListeners = workflowStartedListeners; } /** @@ -1261,7 +1357,7 @@ public void setParentWorkflow(IWorkflowEngine parentWorkflow) { * @return value of parentPipeline */ @Override - public IPipelineEngine getParentPipeline() { + public IPipelineEngine getParentPipeline() { return parentPipeline; } diff --git a/engine/src/main/resources/org/apache/hop/workflow/messages/messages_en_US.properties b/engine/src/main/resources/org/apache/hop/workflow/messages/messages_en_US.properties index a25a8caefeb..4b7bf5d057b 100644 --- a/engine/src/main/resources/org/apache/hop/workflow/messages/messages_en_US.properties +++ b/engine/src/main/resources/org/apache/hop/workflow/messages/messages_en_US.properties @@ -42,6 +42,7 @@ Workflow.Log.FinishedAction=Finished action [{0}] (result\=[{1}]) Workflow.Log.LaunchedActionInParallel=Launched action [{0}] in parallel. Workflow.Log.NoHopServerSpecified=No hop server specified Workflow.Log.StartingAction=Starting action [{0}] +Workflow.Log.StopWorkflowExecution=Stop workflow execution Workflow.Log.UnexpectedError=Unexpected error occurred while launching entry [{0}] Workflow.Log.UnexpectedErrorWhileWaitingForAction=Unexpected error while waiting for action [{0}] to finish. Workflow.Log.UniqueWorkflowName=The workflow needs a name to uniquely identify it by on the remote server. diff --git a/engine/src/test/java/org/apache/hop/pipeline/PipelineTest.java b/engine/src/test/java/org/apache/hop/pipeline/PipelineTest.java index d97a4131baf..57f958a5131 100644 --- a/engine/src/test/java/org/apache/hop/pipeline/PipelineTest.java +++ b/engine/src/test/java/org/apache/hop/pipeline/PipelineTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import org.apache.hop.core.Const; import org.apache.hop.core.HopEnvironment; @@ -82,8 +81,8 @@ public void pipelineWithNoTransformsIsNotEndless() throws Exception { pipelineWithNoTransforms.startThreads(); // check pipeline lifecycle is not corrupted - verify(pipelineWithNoTransforms).firePipelineExecutionStartedListeners(); - verify(pipelineWithNoTransforms).firePipelineExecutionFinishedListeners(); + verify(pipelineWithNoTransforms).fireExecutionStartedListeners(); + verify(pipelineWithNoTransforms).fireExecutionFinishedListeners(); } /** @@ -144,21 +143,21 @@ public void testPipelineStoppedListenersConcurrentModification() throws Interrup public void testFirePipelineFinishedListeners() throws Exception { Pipeline pipeline = new LocalPipelineEngine(); IExecutionFinishedListener mockListener = mock(IExecutionFinishedListener.class); - pipeline.setExecutionFinishedListeners(Collections.singletonList(mockListener)); + pipeline.addExecutionFinishedListener(mockListener); - pipeline.firePipelineExecutionFinishedListeners(); + pipeline.fireExecutionFinishedListeners(); verify(mockListener).finished(pipeline); } @Test(expected = HopException.class) - public void testFirePipelineFinishedListenersExceptionOnPipelineFinished() throws Exception { + public void testFireExecutionFinishedListenersExceptionOnPipelineFinished() throws Exception { Pipeline pipeline = new LocalPipelineEngine(); IExecutionFinishedListener mockListener = mock(IExecutionFinishedListener.class); doThrow(HopException.class).when(mockListener).finished(pipeline); - pipeline.setExecutionFinishedListeners(Collections.singletonList(mockListener)); + pipeline.addExecutionFinishedListener(mockListener); - pipeline.firePipelineExecutionFinishedListeners(); + pipeline.fireExecutionFinishedListeners(); } @Test @@ -244,11 +243,7 @@ public void run() { throw new RuntimeException(); } while (!isStopped()) { - try { - pipeline.addExecutionStoppedListener(pipelineStoppedListener); - } catch (HopException e) { - throw new RuntimeException(e); - } + pipeline.addExecutionStoppedListener(pipelineStoppedListener); } } } @@ -267,11 +262,7 @@ public void run() { } // run while (!isStopped()) { - try { - pipeline.addExecutionFinishedListener(listener); - } catch (HopException e) { - throw new RuntimeException(e); - } + pipeline.addExecutionFinishedListener(listener); } } } @@ -291,7 +282,7 @@ public void run() { // run while (!isStopped()) { try { - pipeline.firePipelineExecutionFinishedListeners(); + pipeline.fireExecutionFinishedListeners(); // clean array blocking queue pipeline.waitUntilFinished(); } catch (HopException e) { @@ -316,7 +307,7 @@ public void run() { // run while (!isStopped()) { try { - pipeline.firePipelineExecutionStartedListeners(); + pipeline.fireExecutionStartedListeners(); } catch (HopException e) { throw new RuntimeException(); } diff --git a/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java b/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java index 157ebffdc59..7d2bf92fdb7 100644 --- a/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java +++ b/engine/src/test/java/org/apache/hop/workflow/WorkflowTest.java @@ -20,7 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +import java.util.concurrent.CountDownLatch; +import org.apache.hop.core.HopEnvironment; import org.apache.hop.core.database.Database; +import org.apache.hop.core.exception.HopException; import org.apache.hop.core.logging.LogChannel; import org.apache.hop.core.variables.IVariables; import org.apache.hop.metadata.api.IHopMetadataProvider; @@ -29,6 +32,7 @@ import org.apache.hop.workflow.engine.IWorkflowEngine; import org.apache.hop.workflow.engines.local.LocalWorkflowEngine; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class WorkflowTest { @@ -41,9 +45,82 @@ public class WorkflowTest { private ActionMeta mockedActionMeta; private ActionStart mockedActionStart; private LogChannel mockedLogChannel; + int count = 10000; + + private abstract class WorkflowKicker implements Runnable { + protected IWorkflowEngine workflow; + protected int c = 0; + protected CountDownLatch start; + protected int max = count; + + WorkflowKicker(IWorkflowEngine workflow, CountDownLatch start) { + this.workflow = workflow; + this.start = start; + } + + public void await() { + try { + start.await(); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + + public boolean isStopped() { + c++; + return c >= max; + } + } + + private class WorkflowFinishedListenerAdder extends WorkflowKicker { + WorkflowFinishedListenerAdder(IWorkflowEngine workflow, CountDownLatch start) { + super(workflow, start); + } + + @Override + public void run() { + await(); + while (!isStopped()) { + workflow.addExecutionFinishedListener(w -> {}); + } + } + } + + private class WorkflowStoppedListenerAdder extends WorkflowKicker { + WorkflowStoppedListenerAdder(IWorkflowEngine workflow, CountDownLatch start) { + super(workflow, start); + } + + @Override + public void run() { + await(); + while (!isStopped()) { + workflow.addExecutionStoppedListener(w -> {}); + } + } + } + + private class WorkflowStopExecutionCaller extends WorkflowKicker { + WorkflowStopExecutionCaller(IWorkflowEngine workflow, CountDownLatch start) { + super(workflow, start); + } + + @Override + public void run() { + await(); + while (!isStopped()) { + workflow.stopExecution(); + } + } + } + + @BeforeClass + public static void beforeClass() throws HopException { + HopEnvironment.init(); + } @Before - public void init() { + public void init() throws HopException { mockedDataBase = mock(Database.class); mockedWorkflow = mock(Workflow.class); mockedVariableSpace = mock(IVariables.class); @@ -66,4 +143,31 @@ public void testTwoWorkflowsGetSameLogChannelId() { assertEquals(workflow1.getLogChannelId(), workflow2.getLogChannelId()); } + + /** + * Test that workflow stop listeners can be accessed concurrently + * + * @throws InterruptedException + */ + @Test + public void testExecutionStoppedListenersConcurrentModification() throws InterruptedException { + CountDownLatch start = new CountDownLatch(1); + IWorkflowEngine workflow = new LocalWorkflowEngine(); + WorkflowStopExecutionCaller stopper = new WorkflowStopExecutionCaller(workflow, start); + WorkflowStoppedListenerAdder adder = new WorkflowStoppedListenerAdder(workflow, start); + startThreads(stopper, adder, start); + assertEquals("All workflow stop listeners is added", count, adder.c); + assertEquals("All stop call success", count, stopper.c); + } + + private void startThreads(Runnable run1, Runnable run2, CountDownLatch start) + throws InterruptedException { + Thread thread1 = new Thread(run1); + Thread thread2 = new Thread(run2); + thread1.start(); + thread2.start(); + start.countDown(); + thread1.join(); + thread2.join(); + } } diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java index 7798f7f71e4..d991c48b8a5 100644 --- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java +++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java @@ -125,7 +125,7 @@ public abstract class BeamPipelineEngine extends Variables protected Result previousResult; protected ILoggingObject parent; - protected IPipelineEngine parentPipeline; + protected IPipelineEngine parentPipeline; protected IWorkflowEngine parentWorkflow; protected LogLevel logLevel; @@ -393,7 +393,7 @@ public void startThreads() throws HopException { // In any case, fire the finished listeners... // This basically sets the finished flag in this pipeline // - firePipelineExecutionFinishedListeners(); + fireExecutionFinishedListeners(); populateEngineMetrics(); // get the final state if (refreshTimer != null) { refreshTimer.cancel(); // no more needed @@ -574,7 +574,7 @@ protected synchronized void evaluatePipelineStatus() throws HopException { setRunning(false); executionEndDate = new Date(); if (beamEngineRunConfiguration.isRunningAsynchronous()) { - firePipelineExecutionFinishedListeners(); + fireExecutionFinishedListeners(); } logChannel.logBasic("Beam pipeline execution has finished."); } @@ -583,7 +583,7 @@ protected synchronized void evaluatePipelineStatus() throws HopException { case STOPPED: case CANCELLED: if (!isStopped()) { - firePipelineExecutionStoppedListeners(); + fireExecutionStoppedListeners(); cancelRefreshTimer = true; } setStopped(true); @@ -701,11 +701,6 @@ public void resumeExecution() { // Not supported } - /** - * Adds a pipeline started listener. - * - * @param executionStartedListener the pipeline started listener - */ @Override public void addExecutionStartedListener(IExecutionStartedListener executionStartedListener) { synchronized (executionStartedListener) { @@ -713,11 +708,14 @@ public void addExecutionStartedListener(IExecutionStartedListener executionStart } } - /** - * Adds a pipeline finished listener. - * - * @param executionFinishedListener the pipeline finished listener - */ + @Override + public void removeExecutionStartedListener( + IExecutionStartedListener> listener) { + synchronized (executionStartedListeners) { + executionStartedListeners.remove(listener); + } + } + @Override public void addExecutionFinishedListener(IExecutionFinishedListener executionFinishedListener) { synchronized (executionFinishedListener) { @@ -725,6 +723,14 @@ public void addExecutionFinishedListener(IExecutionFinishedListener executionFin } } + @Override + public void removeExecutionFinishedListener( + IExecutionFinishedListener> listener) { + synchronized (executionFinishedListeners) { + executionFinishedListeners.remove(listener); + } + } + @Override public String getComponentLogText(String componentName, int copyNr) { return ""; // TODO implement this @@ -888,7 +894,7 @@ public void addActiveSubPipeline(final String subPipelineName, IPipelineEngine s } @Override - public IPipelineEngine getActiveSubPipeline(final String subPipelineName) { + public IPipelineEngine getActiveSubPipeline(final String subPipelineName) { return activeSubPipelines.get(subPipelineName); } @@ -921,7 +927,7 @@ public void setInternalHopVariables(IVariables variables) { * @return value of parentPipeline */ @Override - public IPipelineEngine getParentPipeline() { + public IPipelineEngine getParentPipeline() { return parentPipeline; } @@ -929,7 +935,7 @@ public IPipelineEngine getParentPipeline() { * @param parentPipeline The parentPipeline to set */ @Override - public void setParentPipeline(IPipelineEngine parentPipeline) { + public void setParentPipeline(IPipelineEngine parentPipeline) { this.parentPipeline = parentPipeline; } @@ -956,6 +962,7 @@ public void setParentWorkflow(IWorkflowEngine parentWorkflow) { * * @return value of executionStartedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionStartedListeners() { return executionStartedListeners; @@ -964,25 +971,18 @@ public void setParentWorkflow(IWorkflowEngine parentWorkflow) { /** * @param executionStartedListeners The executionStartedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionStartedListeners( List>> executionStartedListeners) { this.executionStartedListeners = executionStartedListeners; } - private void fireExecutionStartedListeners() throws HopException { - synchronized (executionStartedListeners) { - for (IExecutionStartedListener> listener : - executionStartedListeners) { - listener.started(this); - } - } - } - /** * Gets executionFinishedListeners * * @return value of executionFinishedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionFinishedListeners() { return executionFinishedListeners; @@ -991,6 +991,7 @@ private void fireExecutionStartedListeners() throws HopException { /** * @param executionFinishedListeners The executionFinishedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionFinishedListeners( List>> executionFinishedListeners) { this.executionFinishedListeners = executionFinishedListeners; @@ -998,14 +999,28 @@ public void setExecutionFinishedListeners( @Override public void addExecutionStoppedListener( - IExecutionStoppedListener> listener) throws HopException { + IExecutionStoppedListener> listener) { synchronized (executionStoppedListeners) { executionStoppedListeners.add(listener); } } @Override + public void removeExecutionStoppedListener( + IExecutionStoppedListener> listener) { + synchronized (executionStoppedListeners) { + executionStoppedListeners.remove(listener); + } + } + + @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionStartedListeners() throws HopException { + fireExecutionStartedListeners(); + } + + @Override + public void fireExecutionStartedListeners() throws HopException { synchronized (executionStartedListeners) { for (IExecutionStartedListener> listener : executionStartedListeners) { @@ -1015,7 +1030,13 @@ public void firePipelineExecutionStartedListeners() throws HopException { } @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionFinishedListeners() throws HopException { + fireExecutionFinishedListeners(); + } + + @Override + public void fireExecutionFinishedListeners() throws HopException { synchronized (executionFinishedListeners) { for (IExecutionFinishedListener> listener : executionFinishedListeners) { @@ -1045,7 +1066,13 @@ public void firePipelineExecutionFinishedListeners() throws HopException { } @Override + @Deprecated(since = "2.9", forRemoval = true) public void firePipelineExecutionStoppedListeners() throws HopException { + fireExecutionStoppedListeners(); + } + + @Override + public void fireExecutionStoppedListeners() throws HopException { synchronized (executionStoppedListeners) { for (IExecutionStoppedListener> listener : executionStoppedListeners) { @@ -1181,6 +1208,7 @@ public void stopExecutionInfoTimer() throws HopException { * * @return value of executionStoppedListeners */ + @Deprecated(since = "2.9", forRemoval = true) public List>> getExecutionStoppedListeners() { return executionStoppedListeners; @@ -1189,6 +1217,7 @@ public void stopExecutionInfoTimer() throws HopException { /** * @param executionStoppedListeners The executionStoppedListeners to set */ + @Deprecated(since = "2.9", forRemoval = true) public void setExecutionStoppedListeners( List>> executionStoppedListeners) { this.executionStoppedListeners = executionStoppedListeners; diff --git a/plugins/misc/debug/src/main/java/org/apache/hop/debug/workflow/LogWorkflowExecutionTimeExtensionPoint.java b/plugins/misc/debug/src/main/java/org/apache/hop/debug/workflow/LogWorkflowExecutionTimeExtensionPoint.java index d73ac2b2139..560a555e886 100644 --- a/plugins/misc/debug/src/main/java/org/apache/hop/debug/workflow/LogWorkflowExecutionTimeExtensionPoint.java +++ b/plugins/misc/debug/src/main/java/org/apache/hop/debug/workflow/LogWorkflowExecutionTimeExtensionPoint.java @@ -51,7 +51,7 @@ public void callExtensionPoint( final long startTime = System.currentTimeMillis(); - workflow.addWorkflowFinishedListener( + workflow.addExecutionFinishedListener( workflow1 -> { Date startDate = workflow1.getExecutionStartDate(); Date endDate = workflow1.getExecutionEndDate(); diff --git a/plugins/misc/reflection/src/main/java/org/apache/hop/reflection/workflow/xp/WorkflowStartLoggingXp.java b/plugins/misc/reflection/src/main/java/org/apache/hop/reflection/workflow/xp/WorkflowStartLoggingXp.java index 7d4721d6f84..a213722fc8b 100644 --- a/plugins/misc/reflection/src/main/java/org/apache/hop/reflection/workflow/xp/WorkflowStartLoggingXp.java +++ b/plugins/misc/reflection/src/main/java/org/apache/hop/reflection/workflow/xp/WorkflowStartLoggingXp.java @@ -133,7 +133,7 @@ private void logWorkflow( } if (workflowLog.isExecutingAtEnd()) { - workflow.addWorkflowFinishedListener( + workflow.addExecutionFinishedListener( engine -> { executeLoggingPipeline( workflowLog, "end", loggingPipelineFilename, workflow, variables); diff --git a/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java b/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java index 9083de1bbdb..7a90b68b5da 100644 --- a/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java +++ b/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java @@ -4651,7 +4651,7 @@ private synchronized void startThreads() { startRedrawTimer(); updateGui(); - } catch (HopException e) { + } catch (Exception e) { log.logError("Error starting transform threads", e); checkErrorVisuals(); stopRedrawTimer(); diff --git a/ui/src/main/java/org/apache/hop/ui/hopgui/file/workflow/HopGuiWorkflowGraph.java b/ui/src/main/java/org/apache/hop/ui/hopgui/file/workflow/HopGuiWorkflowGraph.java index 36346e1b3ae..e665fbe9ade 100644 --- a/ui/src/main/java/org/apache/hop/ui/hopgui/file/workflow/HopGuiWorkflowGraph.java +++ b/ui/src/main/java/org/apache/hop/ui/hopgui/file/workflow/HopGuiWorkflowGraph.java @@ -3717,9 +3717,9 @@ public synchronized void start(WorkflowExecutionConfiguration executionConfigura updateGui(); - // Attach a listener to notify us that the pipeline has finished. + // Attach a listener to notify us that the workflow has finished. // - workflow.addWorkflowFinishedListener( + workflow.addExecutionFinishedListener( workflow -> HopGuiWorkflowGraph.this.workflowFinished()); // Show the execution results views