Skip to content

Commit

Permalink
feat: clean up processing log metric (#9105)
Browse files Browse the repository at this point in the history
* feat: clean up processing log metrics

* checkstyle

* change function to with prefix
  • Loading branch information
stevenpyzhang authored May 12, 2022
1 parent f01cbd1 commit 926e440
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -115,6 +116,14 @@ public synchronized Collection<ProcessingLogger> getLoggers() {
return processingLoggers.values();
}

@Override
public synchronized Collection<ProcessingLogger> getLoggersWithPrefix(final String prefix) {
return processingLoggers.keySet().stream()
.filter(loggerName -> loggerName.startsWith(prefix))
.map(processingLoggers::get)
.collect(Collectors.toList());
}

private static Sensor configureProcessingErrorSensor(
final Metrics metrics,
final Map<String, String> metricsTags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public ProcessingLogger getLogger(
public Collection<ProcessingLogger> getLoggers() {
return ImmutableList.of();
}

@Override
public Collection<ProcessingLogger> getLoggersWithPrefix(final String substr) {
return ImmutableList.of();
}
};

public static final ProcessingLogContext INSTANCE = new NoopProcessingLogContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public interface ProcessingLoggerFactory {
* @return A collection of all loggers that have been created by the factory
*/
Collection<ProcessingLogger> getLoggers();

/**
* @return A collection of loggers which have the substring in their names
*/
Collection<ProcessingLogger> getLoggersWithPrefix(String substr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ public void shouldHandleNullMetrics() {
verify(loggerWithMetricsFactoryHelper).apply(logger, null);
}

@Test
public void shouldReturnLoggersWithPrefix() {
// Given:
factory.getLogger("boo.far.deserializer");
factory.getLogger("boo.far.serializer", Collections.singletonMap("tag1", "some-id-2"));
factory.getLogger("far.boo", Collections.singletonMap("tag3", "some-id-2"));

// Then:
assertThat(factory.getLoggersWithPrefix("boo.far").size(), is(2));
}

private double getMetricValue( final Map<String, String> metricsTags) {
final Metrics metrics = metricCollectors.getMetrics();
return Double.parseDouble(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ TransientQueryMetadata buildTransientQuery(
resultType,
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS),
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS),
listener
listener,
processingLogContext.getLoggerFactory()
);
}

Expand Down Expand Up @@ -373,7 +374,8 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS),
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS),
listener,
scalablePushRegistry
scalablePushRegistry,
processingLogContext.getLoggerFactory()
);

}
Expand Down Expand Up @@ -478,15 +480,16 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
getUncaughtExceptionProcessingLogger(queryId),
sinkDataSource,
listener,
scalablePushRegistry,
scalablePushRegistry,
(streamsRuntime) -> getNamedTopology(
streamsRuntime,
queryId,
applicationId,
queryOverrides,
physicalPlan
),
keyFormat
keyFormat,
processingLogContext.getLoggerFactory()
);
if (real) {
return binPackedPersistentQueryMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.execution.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta
private final MaterializationProviderBuilderFactory
materializationProviderBuilderFactory;
private final Optional<ScalablePushRegistry> scalablePushRegistry;
private final ProcessingLoggerFactory loggerFactory;
public boolean everStarted = false;
private boolean corruptionCommandTopic = false;

Expand All @@ -109,7 +111,8 @@ public BinPackedPersistentQueryMetadataImpl(
final Listener listener,
final Optional<ScalablePushRegistry> scalablePushRegistry,
final Function<SharedKafkaStreamsRuntime, NamedTopology> namedTopologyBuilder,
final KeyFormat keyFormat) {
final KeyFormat keyFormat,
final ProcessingLoggerFactory loggerFactory) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.persistentQueryType = Objects.requireNonNull(persistentQueryType, "persistentQueryType");
this.statementString = Objects.requireNonNull(statementString, "statementString");
Expand All @@ -136,6 +139,7 @@ public BinPackedPersistentQueryMetadataImpl(
this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue();
this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry");
this.keyFormat = requireNonNull(keyFormat, "keyFormat");
this.loggerFactory = requireNonNull(loggerFactory, "loggerFactory");
}

// for creating sandbox instances
Expand Down Expand Up @@ -165,6 +169,7 @@ public BinPackedPersistentQueryMetadataImpl(
this.scalablePushRegistry = original.scalablePushRegistry;
this.namedTopologyBuilder = original.namedTopologyBuilder;
this.keyFormat = original.keyFormat;
this.loggerFactory = original.loggerFactory;
}

@Override
Expand Down Expand Up @@ -397,6 +402,7 @@ public void onStateChange(final State newState, final State oldState) {

@Override
public void close() {
loggerFactory.getLoggersWithPrefix(queryId.toString()).forEach(ProcessingLogger::close);
sharedKafkaStreamsRuntime.stop(queryId, false);
scalablePushRegistry.ifPresent(ScalablePushRegistry::close);
listener.onClose(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
Expand Down Expand Up @@ -86,7 +87,8 @@ public PersistentQueryMetadataImpl(
final long retryBackoffInitialMs,
final long retryBackoffMaxMs,
final QueryMetadata.Listener listener,
final Optional<ScalablePushRegistry> scalablePushRegistry
final Optional<ScalablePushRegistry> scalablePushRegistry,
final ProcessingLoggerFactory loggerFactory
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
Expand All @@ -105,7 +107,8 @@ public PersistentQueryMetadataImpl(
maxQueryErrorsQueueSize,
retryBackoffInitialMs,
retryBackoffMaxMs,
new QueryListenerWrapper(listener, scalablePushRegistry)
new QueryListenerWrapper(listener, scalablePushRegistry),
loggerFactory
);
this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource");
this.schemas = requireNonNull(schemas, "schemas");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.KafkaStreamsBuilder;
Expand Down Expand Up @@ -71,6 +73,7 @@ public class QueryMetadataImpl implements QueryMetadata {
private final TimeBoundedQueue queryErrors;
private final RetryEvent retryEvent;
private final Listener listener;
private final ProcessingLoggerFactory loggerFactory;
private volatile boolean everStarted = false;
private volatile KafkaStreams kafkaStreams;
// These fields don't need synchronization because they are initialized in initialize() before
Expand Down Expand Up @@ -103,7 +106,8 @@ public long read() {
final int maxQueryErrorsQueueSize,
final long baseWaitingTimeMs,
final long retryBackoffMaxMs,
final Listener listener
final Listener listener,
final ProcessingLoggerFactory loggerFactory
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementString = Objects.requireNonNull(statementString, "statementString");
Expand Down Expand Up @@ -132,6 +136,7 @@ public long read() {
retryBackoffMaxMs,
CURRENT_TIME_MILLIS_TICKER
);
this.loggerFactory = Objects.requireNonNull(loggerFactory, "loggerFactory");
}

// Used for sandboxing
Expand Down Expand Up @@ -165,6 +170,7 @@ public long read() {
);
this.listener
= Objects.requireNonNull(listener, "stopListeners");
this.loggerFactory = other.loggerFactory;
}

public void initialize() {
Expand Down Expand Up @@ -352,6 +358,7 @@ protected boolean closeKafkaStreams() {
* schemas, etc...).
*/
public void close() {
loggerFactory.getLoggersWithPrefix(queryId.toString()).forEach(ProcessingLogger::close);
doClose(true);
listener.onClose(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.CompletionHandler;
Expand Down Expand Up @@ -58,7 +59,8 @@ public TransientQueryMetadata(
final ResultType resultType,
final long retryBackoffInitialMs,
final long retryBackoffMaxMs,
final Listener listener
final Listener listener,
final ProcessingLoggerFactory loggerFactory
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
Expand All @@ -77,7 +79,8 @@ public TransientQueryMetadata(
maxQueryErrorsQueueSize,
retryBackoffInitialMs,
retryBackoffMaxMs,
listener
listener,
loggerFactory
);
this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue");
this.resultType = Objects.requireNonNull(resultType, "resultType");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.logging.processing.MeteredProcessingLoggerFactory;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.execution.scalablepush.ScalablePushRegistry;
Expand All @@ -37,6 +38,8 @@
import io.confluent.ksql.schema.query.QuerySchemas;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.util.QueryMetadata.Listener;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -94,6 +97,8 @@ public class BinPackedPersistentQueryMetadataImplTest {
private MaterializationInfo materializationInfo;
@Mock
private MaterializationProviderBuilderFactory.MaterializationProviderBuilder materializationProviderBuilder;
@Mock
private MeteredProcessingLoggerFactory loggerFactory;

private PersistentQueryMetadata query;

Expand All @@ -119,7 +124,8 @@ public void setUp() {
listener,
scalablePushRegistry,
(runtime) -> topology,
keyFormat);
keyFormat,
loggerFactory);

query.initialize();
when(materializationProviderBuilderFactory.materializationProviderBuilder(
Expand Down Expand Up @@ -169,4 +175,19 @@ public void shouldCallKafkaStreamsCloseOnStop() {
// Then:
verify(sharedKafkaStreamsRuntimeImpl).stop(QUERY_ID, false);
}

@Test
public void shouldCloseProcessingLoggers() {
// Given:
final ProcessingLogger processingLogger1 = mock(ProcessingLogger.class);
final ProcessingLogger processingLogger2 = mock(ProcessingLogger.class);
when(loggerFactory.getLoggersWithPrefix(QUERY_ID.toString())).thenReturn(Arrays.asList(processingLogger1, processingLogger2));

// When:
query.close();

// Then:
verify(processingLogger1).close();
verify(processingLogger2).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.MeteredProcessingLoggerFactory;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.execution.scalablepush.ScalablePushRegistry;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class PersistentQueryMetadataTest {
private Listener listener;
@Mock
private ScalablePushRegistry scalablePushRegistry;
@Mock
private MeteredProcessingLoggerFactory processingLoggerFactory;

private PersistentQueryMetadata query;

Expand Down Expand Up @@ -128,7 +131,8 @@ public void setUp() {
0L,
0L,
listener,
Optional.of(scalablePushRegistry)
Optional.of(scalablePushRegistry),
processingLoggerFactory
);

query.initialize();
Expand Down Expand Up @@ -160,7 +164,8 @@ public void shouldReturnInsertQueryType() {
0L,
0L,
listener,
Optional.empty()
Optional.empty(),
processingLoggerFactory
);

// When/Then
Expand Down Expand Up @@ -193,7 +198,8 @@ public void shouldReturnCreateAsQueryType() {
0L,
0L,
listener,
Optional.empty()
Optional.empty(),
processingLoggerFactory
);

// When/Then
Expand Down
Loading

0 comments on commit 926e440

Please sign in to comment.