Skip to content

Commit

Permalink
[cdc-runtime][hotfix] Setup waitFlushSuccess before responding operat…
Browse files Browse the repository at this point in the history
…or to flush. (#2805)

This closes #2805.

There might be a case that sink flushes faster than schema operator requesting release upstream. In that case waitFlushSuccess has not been set when sink has flushed successfully, which will lead to a NullPointerException.
  • Loading branch information
PatrickRen authored Dec 4, 2023
1 parent 95921d5 commit 9ce8450
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
this.waitFlushSuccess =
new PendingSchemaChange(request, response).startToWaitForFlushSuccess();
return response;
} else {
LOG.info("There are already processing requests. Wait for processing.");
Expand All @@ -116,7 +117,6 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(

/** Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing. */
public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() {
this.waitFlushSuccess = pendingSchemaChanges.remove(0).startToWaitForFlushSuccess();
return waitFlushSuccess.getResponseFuture();
}

Expand Down Expand Up @@ -152,19 +152,19 @@ private void startNextSchemaChangeRequest() {
flushedSinkWriters.clear();
waitFlushSuccess = null;
while (!pendingSchemaChanges.isEmpty()) {
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0);
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.remove(0);
SchemaChangeRequest request = pendingSchemaChange.changeRequest;
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.schemaExists(request.getTableId())) {
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(false)));
pendingSchemaChanges.remove(0);
} else {
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(true)));
this.waitFlushSuccess = pendingSchemaChange.startToWaitForFlushSuccess();
break;
}
}
Expand Down

0 comments on commit 9ce8450

Please sign in to comment.