Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds instrumentation for search path #8408

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168))
- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414))
- Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545))
- Adds instrumentation for search path ([#8408](https://github.com/opensearch-project/OpenSearch/issues/8408))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Expand Down Expand Up @@ -168,4 +169,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.8...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.8...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.TracerFactory;
import org.opensearch.telemetry.tracing.listener.TracingActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterAware;
import org.opensearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -156,6 +159,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final TracerFactory tracerFactory;

@Inject
public TransportSearchAction(
Expand All @@ -170,7 +174,8 @@ public TransportSearchAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService
SearchPipelineService searchPipelineService,
TracerFactory tracerFactory
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -185,6 +190,7 @@ public TransportSearchAction(
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.namedWriteableRegistry = namedWriteableRegistry;
this.searchPipelineService = searchPipelineService;
this.tracerFactory = tracerFactory;
}

private Map<String, AliasFilter> buildPerIndexAliasFilter(
Expand Down Expand Up @@ -286,7 +292,9 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
listener
);
}
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
SpanScope scope = tracerFactory.getTracer().startSpan("SearchTask_" + task.getId());
TracingActionListener tracingActionListener = new TracingActionListener(tracerFactory, listener, scope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TracingAwareListenerWrapper.
I am not an expert on search code path but do you think instrumenting SearchOperationListener would easy / clean access to different phases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shwetathareja I tried implementing SearchOperationListener for instrumenting query and fetch phase but since now we are returning the SpanScope from the startSpan, hence we need to maintain the state (SpanScope returned from the preQueryPhase needs to be cached for ending in the onQueryPhase method.). We can do that in the ThreadLocal since this runs in a single threaded fashion as far as I understood. Need your thoughts on that? @reta

executeRequest(task, searchRequest, this::searchAsyncAction, tracingActionListener);
Copy link
Member

@shwetathareja shwetathareja Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please share a sample tracing span, how will that look like and what attributes will be collected. Also how other transport action will be instrumented with any context being passed from client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shwetathareja, Here are the sample spans.

QueryPhase_[my-index-0000021][0]' : e79d8aa8d6d81b267283f06c084c05e1 57af1b95e8412a91 INTERNAL [tracer: os-tracer:] AttributesMap{data={shard_id=0, th_name=opensearch[runTask-0][search][T#3], node_id=7ZhAHn8OSx6dVMWVdxNfpw}, capacity=128, totalAddedValues=3}
'FetchPhase_[my-index-000002][0]' : e79d8aa8d6d81b267283f06c084c05e1 f8aa81e7496b3fc6 INTERNAL [tracer: os-tracer:] AttributesMap{data={shard_id=0, th_name=opensearch[runTask-0][search][T#13], node_id=7ZhAHn8OSx6dVMWVdxNfpw}, capacity=128, totalAddedValues=3}

Yes, we will add instrumentations in the foundational classes like RestAction, TransportAction and TaskManager etc.

}

/**
Expand Down
30 changes: 17 additions & 13 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,17 @@ protected Node(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
);
}

if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
} else {
tracerFactory = new NoopTracerFactory();
}
resourcesToClose.add(tracerFactory::close);

final IngestService ingestService = new IngestService(
clusterService,
threadPool,
Expand Down Expand Up @@ -1026,19 +1037,10 @@ protected Node(
searchModule.getFetchPhase(),
responseCollectorService,
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool)
searchModule.getIndexSearcherExecutor(threadPool),
tracerFactory
);

if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
} else {
tracerFactory = new NoopTracerFactory();
}
resourcesToClose.add(tracerFactory::close);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
.stream()
.map(
Expand Down Expand Up @@ -1639,7 +1641,8 @@ protected SearchService newSearchService(
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
Executor indexSearcherExecutor,
TracerFactory tracerFactory
) {
return new SearchService(
clusterService,
Expand All @@ -1651,7 +1654,8 @@ protected SearchService newSearchService(
fetchPhase,
responseCollectorService,
circuitBreakerService,
indexSearcherExecutor
indexSearcherExecutor,
tracerFactory
);
}

Expand Down
54 changes: 46 additions & 8 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@
import org.opensearch.search.sort.SortOrder;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.TracerFactory;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
Expand Down Expand Up @@ -303,6 +305,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final AtomicInteger openPitContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;
private final TracerFactory tracerFactory;

public SearchService(
ClusterService clusterService,
Expand All @@ -314,7 +317,8 @@ public SearchService(
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
Executor indexSearcherExecutor,
TracerFactory tracerFactory
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand Down Expand Up @@ -362,6 +366,7 @@ public SearchService(

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
this.tracerFactory = tracerFactory;
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -590,13 +595,18 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true)
) {
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
long afterQueryTime;
final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + context.shardTarget().getShardId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Gaganjuneja we need to change the TracerFactory, the way it is designed (with dynamic on/off setting) is flawed. The consumer could do:

tracer =  tracerFactory.getTracer();

and never ask for tracer again, we need to wrap around instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened a bug #8561 to address this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta Raised a PR for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta With this change, shall we use Tracer.java instead TracerFactory.java in instrumentation code?

try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context); spanScope) {
addtracingAttributes(spanScope, context);
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
}
afterQueryTime = executor.success();
} catch (Exception e) {
spanScope.setError(e);
throw e;
}
if (request.numberOfShards() == 1) {
return executeFetchPhase(readerContext, context, afterQueryTime);
Expand All @@ -621,14 +631,24 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
}
}

private void addtracingAttributes(SpanScope scope, SearchContext context) {
scope.addSpanAttribute("shard_id", context.shardTarget().getShardId().getId());
scope.addSpanAttribute("node_id", context.shardTarget().getNodeId());
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) {
final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + context.shardTarget().getShardId());
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime); spanScope) {
addtracingAttributes(spanScope, context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
executor.success();
} catch (Exception e) {
spanScope.setError(e);
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
throw e;
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
Expand All @@ -649,10 +669,13 @@ public void executeQueryPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + shardSearchRequest.shardId().getId());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext);
spanScope
) {
addtracingAttributes(spanScope, searchContext);
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
queryPhase.execute(searchContext);
Expand All @@ -661,6 +684,7 @@ public void executeQueryPhase(
return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget());
} catch (Exception e) {
logger.trace("Query phase failed", e);
spanScope.setError(e);
// we handle the failure in the failure listener below
throw e;
}
Expand All @@ -673,10 +697,13 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
readerContext.setAggregatedDfs(request.dfs());
final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + shardSearchRequest.shardId().getId());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext);
spanScope
) {
addtracingAttributes(spanScope, searchContext);
searchContext.searcher().setAggregatedDfs(request.dfs());
queryPhase.execute(searchContext);
if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
Expand All @@ -692,6 +719,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
return searchContext.queryResult();
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
spanScope.setError(e);
logger.trace("Query phase failed", e);
// we handle the failure in the failure listener below
throw e;
Expand Down Expand Up @@ -728,10 +756,13 @@ public void executeFetchPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + shardSearchRequest.shardId().getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append "Scroll" to fetch & query phase in the span name

try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext);
spanScope
) {
addtracingAttributes(spanScope, searchContext);
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
Expand All @@ -741,6 +772,7 @@ public void executeFetchPhase(
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
spanScope.setError(e);
logger.trace("Fetch phase failed", e);
// we handle the failure in the failure listener below
throw e;
Expand All @@ -760,14 +792,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds()));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs()));
searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId());
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime())
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime());
spanScope
) {
addtracingAttributes(spanScope, searchContext);
fetchPhase.execute(searchContext);
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
executor.success();
} catch (Exception e) {
spanScope.setError(e);
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
throw e;
}
return searchContext.fetchResult();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.listener;

import org.opensearch.action.ActionListener;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.TracerFactory;

/**
* Handles the tracing scope and delegate the request to the action listener.
* @param <Response> response.
*/
public class TracingActionListener<Response> implements ActionListener<Response> {

private final ActionListener<Response> delegate;
private final SpanScope spanScope;
private final TracerFactory tracerFactory;

/**
* Creates instance.
* @param tracerFactory tracer factory
* @param delegate action listener to be delegated
* @param spanScope tracer scope.
*/
public TracingActionListener(TracerFactory tracerFactory, ActionListener<Response> delegate, SpanScope spanScope) {
this.tracerFactory = tracerFactory;
this.delegate = delegate;
this.spanScope = spanScope;
}

@Override
public void onResponse(Response response) {
try (spanScope) {
delegate.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
try (spanScope) {
spanScope.setError(e);
delegate.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for telemetry.
*/
package org.opensearch.telemetry.tracing.listener;
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@
import org.opensearch.search.query.QueryPhase;
import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.disruption.DisruptableMockTransport;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -2034,7 +2035,8 @@ public void onFailure(final Exception e) {
new FetchPhase(Collections.emptyList()),
responseCollectorService,
new NoneCircuitBreakerService(),
null
null,
new NoopTracerFactory()
);
SearchPhaseController searchPhaseController = new SearchPhaseController(
writableRegistry(),
Expand Down Expand Up @@ -2065,7 +2067,8 @@ public void onFailure(final Exception e) {
List.of(),
client,
false
)
),
new NoopTracerFactory()
)
);
actions.put(
Expand Down
Loading
Loading