Skip to content

Commit

Permalink
fix: npe on transient query close (#7530)
Browse files Browse the repository at this point in the history
Sometimes when queries close the closing thread times out and leaves behind
the cleanup thread in streams. Then, this thread calls the state change callback
which causes our metrics listener to throw an NPE. This patch changes the listener
to deal with this case by checking for null values
  • Loading branch information
rodesai authored May 17, 2021
1 parent d83a8be commit bc64edd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,22 @@ public void onCreate(

@Override
public void onStateChange(final QueryMetadata query, final State before, final State after) {
perQuery.get(query.getQueryId()).onChange(before, after);
// this may be called after the query is deregistered, because shutdown is ansynchronous and
// may time out. when ths happens, the shutdown thread in streams may call this method.
final PerQueryListener listener = perQuery.get(query.getQueryId());
if (listener != null) {
listener.onChange(before, after);
}
}

@Override
public void onError(final QueryMetadata query, final QueryError error) {
perQuery.get(query.getQueryId()).onError(error);
// this may be called after the query is deregistered, because shutdown is ansynchronous and
// may time out. when ths happens, the shutdown thread in streams may call this method.
final PerQueryListener listener = perQuery.get(query.getQueryId());
if (listener != null) {
listener.onError(error);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ public void shouldAddMetricOnCreation() {
verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class));
}

@Test
public void shouldGracefullyHandleStateCallbackAfterDeregister() {
// Given:
listener.onCreate(serviceContext, metaStore, query);
listener.onDeregister(query);

// When/Then(don't throw)
listener.onStateChange(query, State.RUNNING, State.NOT_RUNNING);
}

@Test
public void shouldGracefullyHandleErrorCallbackAfterDeregister() {
// Given:
listener.onCreate(serviceContext, metaStore, query);
listener.onDeregister(query);

// When/Then(don't throw)
listener.onError(query, new QueryError(123, "foo", Type.USER));
}

@Test
public void shouldAddMetricWithSuppliedPrefix() {
// Given:
Expand Down

0 comments on commit bc64edd

Please sign in to comment.