Skip to content

Commit

Permalink
fix: idempotent terminate that can handle hung streams (#4643)
Browse files Browse the repository at this point in the history
Fixes a couple issues with terminate:
- don't throw if the query doesn't get into NOT_RUNNING state. This can
  happen when streams threads are stuck pending shutdown.
- make terminate idempotent
  • Loading branch information
rodesai authored and colinhicks committed Feb 27, 2020
1 parent cb66e05 commit d96db14
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,10 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet
final String applicationId = query.getQueryApplicationId();

if (!query.getState().equalsIgnoreCase("NOT_RUNNING")) {
throw new IllegalStateException("query not stopped."
+ " id " + applicationId + ", state: " + query.getState());
log.warn(
"Unregistering query that has not terminated. "
+ "This may happen when streams threads are hung. State: " + query.getState()
);
}

if (!allLiveQueries.remove(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,8 @@ private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQue
return;
}

ksqlEngine.getPersistentQuery(queryId.get())
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
final Optional<PersistentQueryMetadata> query = ksqlEngine.getPersistentQuery(queryId.get());
query.ifPresent(PersistentQueryMetadata::close);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -715,6 +716,37 @@ public void shouldTerminateAll() {
verify(query1).close();
}

@Test
public void shouldDoIdempotentTerminate() {
// Given:
final String queryStatement = "a persistent query";

final TerminateQuery terminate = mock(TerminateQuery.class);
when(terminate.getQueryId()).thenReturn(Optional.of(new QueryId("foo")));

when(mockParser.parseSingleStatement(any()))
.thenReturn(PreparedStatement.of(queryStatement, terminate));

final PersistentQueryMetadata query = mock(PersistentQueryMetadata.class);

when(mockEngine.getPersistentQuery(new QueryId("foo")))
.thenReturn(Optional.of(query))
.thenReturn(Optional.empty());

final QueuedCommand cmd = new QueuedCommand(
new CommandId(Type.TERMINATE, "-", Action.EXECUTE),
new Command("terminate all", emptyMap(), emptyMap(), Optional.empty()),
Optional.empty(),
0L
);

// When:
statementExecutorWithMocks.handleStatement(cmd);
statementExecutorWithMocks.handleStatement(cmd);

// Then should not throw
}

private void createStreamsAndStartTwoPersistentQueries() {
final Command csCommand = new Command(
"CREATE STREAM pageview ("
Expand Down

0 comments on commit d96db14

Please sign in to comment.