From d8a60446b3bf6e07ab381ffa46c9f972f8d5c2ef Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Sun, 3 Dec 2023 21:59:04 +0800 Subject: [PATCH] [cdc-runtime] Setup waitFlushSuccess before responding operator to flush. There might be a case that sink flushes faster than schema operator requesting release upstream. In that case waitFlushSuccess has not been set when sink has flushed successfully, which will lead to a NullPointerException. --- .../schema/coordinator/SchemaRegistryRequestHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index f91c2a3c60..c1a907e9bf 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -104,7 +104,8 @@ public CompletableFuture handleSchemaChangeRequest( CompletableFuture response = CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true))); schemaManager.applySchemaChange(request.getSchemaChangeEvent()); - pendingSchemaChanges.add(new PendingSchemaChange(request, response)); + this.waitFlushSuccess = + new PendingSchemaChange(request, response).startToWaitForFlushSuccess(); return response; } else { LOG.info("There are already processing requests. Wait for processing."); @@ -116,7 +117,6 @@ public CompletableFuture handleSchemaChangeRequest( /** Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing. */ public CompletableFuture handleReleaseUpstreamRequest() { - this.waitFlushSuccess = pendingSchemaChanges.remove(0).startToWaitForFlushSuccess(); return waitFlushSuccess.getResponseFuture(); } @@ -152,19 +152,19 @@ private void startNextSchemaChangeRequest() { flushedSinkWriters.clear(); waitFlushSuccess = null; while (!pendingSchemaChanges.isEmpty()) { - PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0); + PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.remove(0); SchemaChangeRequest request = pendingSchemaChange.changeRequest; if (request.getSchemaChangeEvent() instanceof CreateTableEvent && schemaManager.schemaExists(request.getTableId())) { pendingSchemaChange .getResponseFuture() .complete(wrap(new SchemaChangeResponse(false))); - pendingSchemaChanges.remove(0); } else { schemaManager.applySchemaChange(request.getSchemaChangeEvent()); pendingSchemaChange .getResponseFuture() .complete(wrap(new SchemaChangeResponse(true))); + this.waitFlushSuccess = pendingSchemaChange.startToWaitForFlushSuccess(); break; } }