Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IExecutionStoppedListener to IWorkflowEngine #3715 #3898

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 74 additions & 37 deletions engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ public void prepareExecution() throws HopException {

// Just for safety, fire the pipeline finished listeners...
try {
firePipelineExecutionFinishedListeners();
fireExecutionFinishedListeners();
} catch (HopException e) {
// listeners produces errors
log.logError(BaseMessages.getString(PKG, "Pipeline.FinishListeners.Exception"));
Expand Down Expand Up @@ -1109,7 +1109,7 @@ public void startThreads() throws HopException {
ExtensionPointHandler.callExtensionPoint(
log, this, HopExtensionPoint.PipelineStartThreads.id, this);

firePipelineExecutionStartedListeners();
fireExecutionStartedListeners();

for (int i = 0; i < transforms.size(); i++) {
final TransformMetaDataCombi sid = transforms.get(i);
Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void startThreads() throws HopException {
executionEndDate = new Date();

try {
firePipelineExecutionFinishedListeners();
fireExecutionFinishedListeners();
} catch (Exception e) {
transform.setErrors(transform.getErrors() + 1L);
log.logError(
Expand Down Expand Up @@ -1293,7 +1293,7 @@ public void run() {
// So we fire the execution finished listeners here.
//
if (transforms.isEmpty()) {
firePipelineExecutionFinishedListeners();
fireExecutionFinishedListeners();
}

if (log.isDetailed()) {
Expand All @@ -1312,16 +1312,23 @@ public void run() {
* @throws HopException if any errors occur during notification
*/
@Override
@Deprecated(since = "2.9", forRemoval = true)
public void firePipelineExecutionFinishedListeners() throws HopException {
fireExecutionFinishedListeners();
}

@Override
public void fireExecutionFinishedListeners() throws HopException {
synchronized (executionFinishedListeners) {
if (executionFinishedListeners.size() == 0) {
return;
}
// prevent Exception from one listener to block others execution
List<HopException> badGuys = new ArrayList<>(executionFinishedListeners.size());
for (IExecutionFinishedListener executionListener : executionFinishedListeners) {
for (IExecutionFinishedListener<IPipelineEngine<PipelineMeta>> listener :
executionFinishedListeners) {
try {
executionListener.finished(this);
listener.finished(this);
} catch (HopException e) {
badGuys.add(e);
}
Expand Down Expand Up @@ -1357,10 +1364,17 @@ public void pipelineCompleted() throws HopException {
* @throws HopException if any errors occur during notification
*/
@Override
@Deprecated(since = "2.9", forRemoval = true)
public void firePipelineExecutionStartedListeners() throws HopException {
fireExecutionStartedListeners();
}

@Override
public void fireExecutionStartedListeners() throws HopException {
synchronized (executionStartedListeners) {
for (IExecutionStartedListener executionListener : executionStartedListeners) {
executionListener.started(this);
for (IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener :
executionStartedListeners) {
listener.started(this);
}
}
}
Expand Down Expand Up @@ -1595,7 +1609,7 @@ public void safeStop() {
}
transforms.stream().filter(this::isInputTransform).forEach(combi -> stopTransform(combi, true));

firePipelineExecutionStoppedListeners();
fireExecutionStoppedListeners();
}

private boolean isInputTransform(TransformMetaDataCombi combi) {
Expand All @@ -1617,7 +1631,7 @@ public void stopAll() {
setStopped(true);
isAlreadyStopped.set(true);

firePipelineExecutionStoppedListeners();
fireExecutionStoppedListeners();
}

public void stopTransform(TransformMetaDataCombi combi, boolean safeStop) {
Expand All @@ -1638,11 +1652,16 @@ public void stopTransform(TransformMetaDataCombi combi, boolean safeStop) {
}

@Override
@Deprecated(since = "2.9", forRemoval = true)
public void firePipelineExecutionStoppedListeners() {
// Fire the stopped listener...
//
fireExecutionStoppedListeners();
}

@Override
public void fireExecutionStoppedListeners() {
synchronized (executionStoppedListeners) {
for (IExecutionStoppedListener listener : executionStoppedListeners) {
for (IExecutionStoppedListener<IPipelineEngine<PipelineMeta>> listener :
executionStoppedListeners) {
listener.stopped(this);
}
}
Expand Down Expand Up @@ -2483,39 +2502,51 @@ public void setTransformPerformanceSnapShots(
this.transformPerformanceSnapShots = transformPerformanceSnapShots;
}

/**
* Adds a pipeline started listener.
*
* @param executionStartedListener the pipeline started listener
*/
@Override
public void addExecutionStartedListener(IExecutionStartedListener executionStartedListener) {
synchronized (executionStartedListener) {
executionStartedListeners.add(executionStartedListener);
public void addExecutionStartedListener(
IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStartedListeners) {
executionStartedListeners.add(listener);
}
}

/**
* Adds a pipeline finished listener.
*
* @param executionFinishedListener the pipeline finished listener
*/
@Override
public void addExecutionFinishedListener(IExecutionFinishedListener executionFinishedListener) {
synchronized (executionFinishedListener) {
executionFinishedListeners.add(executionFinishedListener);
public void removeExecutionStartedListener(
IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStartedListeners) {
executionStartedListeners.remove(listener);
}
}

@Override
public void addExecutionFinishedListener(
IExecutionFinishedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionFinishedListeners) {
executionFinishedListeners.add(listener);
}
}

@Override
public void removeExecutionFinishedListener(
IExecutionFinishedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionFinishedListeners) {
executionFinishedListeners.remove(listener);
}
}

/**
* Adds a pipeline stopped listener.
*
* @param executionStoppedListener the pipeline stopped listener
*/
@Override
public void addExecutionStoppedListener(IExecutionStoppedListener executionStoppedListener) {
synchronized (executionStoppedListener) {
executionStoppedListeners.add(executionStoppedListener);
public void addExecutionStoppedListener(
IExecutionStoppedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStoppedListeners) {
executionStoppedListeners.add(listener);
}
}

@Override
public void removeExecutionStoppedListener(
IExecutionStoppedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStoppedListeners) {
executionStoppedListeners.remove(listener);
}
}

Expand All @@ -2524,6 +2555,7 @@ public void addExecutionStoppedListener(IExecutionStoppedListener executionStopp
*
* @param executionStoppedListeners the list of stop-event listeners to set
*/
@Deprecated(since = "2.9", forRemoval = true)
public void setExecutionStoppedListeners(
List<IExecutionStoppedListener<IPipelineEngine<PipelineMeta>>> executionStoppedListeners) {
this.executionStoppedListeners = Collections.synchronizedList(executionStoppedListeners);
Expand All @@ -2535,6 +2567,7 @@ public void setExecutionStoppedListeners(
*
* @return the list of stop-event listeners
*/
@Deprecated(since = "2.9", forRemoval = true)
public List<IExecutionStoppedListener<IPipelineEngine<PipelineMeta>>>
getExecutionStoppedListeners() {
return executionStoppedListeners;
Expand Down Expand Up @@ -3543,6 +3576,7 @@ public void setFeedbackSize(int feedbackSize) {
*
* @return value of executionStartedListeners
*/
@Deprecated(since = "2.9", forRemoval = true)
public List<IExecutionStartedListener<IPipelineEngine<PipelineMeta>>>
getExecutionStartedListeners() {
return executionStartedListeners;
Expand All @@ -3551,6 +3585,7 @@ public void setFeedbackSize(int feedbackSize) {
/**
* @param executionStartedListeners The executionStartedListeners to set
*/
@Deprecated(since = "2.9", forRemoval = true)
public void setExecutionStartedListeners(
List<IExecutionStartedListener<IPipelineEngine<PipelineMeta>>> executionStartedListeners) {
this.executionStartedListeners = executionStartedListeners;
Expand All @@ -3561,6 +3596,7 @@ public void setExecutionStartedListeners(
*
* @return value of executionFinishedListeners
*/
@Deprecated(since = "2.9", forRemoval = true)
public List<IExecutionFinishedListener<IPipelineEngine<PipelineMeta>>>
getExecutionFinishedListeners() {
return executionFinishedListeners;
Expand All @@ -3569,6 +3605,7 @@ public void setExecutionStartedListeners(
/**
* @param executionFinishedListeners The executionFinishedListeners to set
*/
@Deprecated(since = "2.9", forRemoval = true)
public void setExecutionFinishedListeners(
List<IExecutionFinishedListener<IPipelineEngine<PipelineMeta>>> executionFinishedListeners) {
this.executionFinishedListeners = executionFinishedListeners;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -195,38 +195,92 @@ public interface IPipelineEngine<T extends PipelineMeta>
boolean isPaused();

/**
* Call the given listener lambda when this pipeline engine has started execution.
* Attach a listener to notify when the pipeline has started execution.
*
* @param listener
* @throws HopException
* @param listener the pipeline started listener
*/
void addExecutionStartedListener(IExecutionStartedListener<IPipelineEngine<T>> listener)
throws HopException;
void addExecutionStartedListener(IExecutionStartedListener<IPipelineEngine<T>> listener);

void firePipelineExecutionStartedListeners() throws HopException;
/**
* Detach a listener to notify when the pipeline has started execution.
*
* @param listener the pipeline started listener
*/
void removeExecutionStartedListener(IExecutionStartedListener<IPipelineEngine<T>> listener);

/**
* Call the given listener lambda when this pipeline engine has completed execution.
* Use the {@link #fireExecutionStartedListeners} method.
*
* @param listener
* @throws HopException
*/
void addExecutionFinishedListener(IExecutionFinishedListener<IPipelineEngine<T>> listener)
throws HopException;
@Deprecated(since = "2.9", forRemoval = true)
void firePipelineExecutionStartedListeners() throws HopException;

void firePipelineExecutionFinishedListeners() throws HopException;
/**
* Make attempt to fire all registered started execution listeners if possible.
*
* @throws HopException if any errors occur during notification
*/
void fireExecutionStartedListeners() throws HopException;

/**
* Attach a listener to notify when the pipeline has completed execution.
*
* @param listener the pipeline finished listener
*/
void addExecutionFinishedListener(IExecutionFinishedListener<IPipelineEngine<T>> listener);

/**
* Call the given listener lambda when this pipeline engine has stopped execution.
* Detach a listener to notify when the pipeline has completed execution.
*
* @param listener the pipeline finished listener
*/
void removeExecutionFinishedListener(IExecutionFinishedListener<IPipelineEngine<T>> listener);

/**
* Use the {@link #fireExecutionFinishedListeners} method.
*
* @param listener
* @throws HopException
*/
void addExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> listener)
throws HopException;
@Deprecated(since = "2.9", forRemoval = true)
void firePipelineExecutionFinishedListeners() throws HopException;

/**
* Make attempt to fire all registered finished execution listeners if possible.
*
* @throws HopException if any errors occur during notification
*/
void fireExecutionFinishedListeners() throws HopException;

/**
* Attach a listener to notify when the pipeline has stopped execution.
*
* @param listener the pipeline stopped listener
*/
void addExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> listener);

/**
* Detach a listener to notify when the pipeline has stopped execution.
*
* @param listener the pipeline stopped listener
*/
void removeExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> listener);

/**
* Use the {@link #fireExecutionStoppedListeners} method.
*
* @throws HopException
*/
@Deprecated(since = "2.9", forRemoval = true)
void firePipelineExecutionStoppedListeners() throws HopException;

/**
* Make attempt to fire all registered stopped execution listeners if possible.
*
* @throws HopException if any errors occur during notification
*/
void fireExecutionStoppedListeners() throws HopException;

/**
* Retrieve the logging text of a particular component in the engine
*
Expand Down Expand Up @@ -301,7 +355,7 @@ void addExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> l
*
* @param parentPipeline
*/
void setParentPipeline(IPipelineEngine parentPipeline);
void setParentPipeline(IPipelineEngine<PipelineMeta> parentPipeline);

/**
* Inform the pipeline about a previous execution result in a workflow or pipeline
Expand Down
Loading
Loading