Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NR-308791: Apache Pekko Instrumentation support #329

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
33 changes: 33 additions & 0 deletions instrumentation-security/apache-pekko-http-core-2.13_1/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Pekko HTTP core instrumentation

This instrumentation is a lift of `akka-http-core-2.13_10.2.0`.

As of `Pekko Http Core 1.0.0`, `bindAndHandleAsync` and `bindAndHandleSync` have both been deprecated and replaced by
`Http().newServerAt().bind()`. However, these methods still exist and are instrumented where used,
so the documentation below (also taken from Akka) is maintained for historical purposes.


## HttpExt Instrumentation
Instrumentation for Pekko HTTP Core is carried out in the `pekko.http.scaladsl.HttpExt` class that serves as the
main entry point for a server. 2 convenience methods from `HttpExt` that can be used to start an HTTP server have
been instrumented, they are :

- ` bindAndHandleAsync`: Convenience method which starts a new HTTP server at the given endpoint and uses handler that is a function receiving an `HttpRequest` and returning a `Future[HttpResponse]`
- ` bindAndHandleSync`: Convenience method which starts a new HTTP server at the given endpoint and uses handler that is a function receiving an `HttpRequest` and returning a `HttpResponse`


It has been decided that instrumentation is not extended for `bindAndHandle` which starts a new HTTP server using a
`pekko.stream.scaladsl.Flow` instance. This is due to a clash in the Akka Http Routing DSL instrumentation.


Users wishing to start an HTTP Server from an `pekko.stream.scaladsl.Flow` can use the following workaround

```scala
val flow: Flow[HttpRequest, HttpResponse, NotUsed] = ???
val asyncHandler: HttpRequest => Future[HttpResponse] = request => Source.single(request).via(flow).runWith(Sink.head)
Http().bindAndHandleAsync(asyncHandler, host, port)
```

This workaround is not needed for users using calling `bindAndHandle` using `akka.http.scaladsl.Route` from the
Pekko Http Routing DSL. Instrumentation should work in the same way being called from the other conveniences methods
to start an HTTP Server.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apply plugin: 'scala'

isScalaProjectEnabled(project, "scala-2.13")

sourceSets.test.scala.srcDir "src/test/java"
sourceSets.test.java.srcDirs = []

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.security.apache-pekko-http-core-2.13_1' }
}

dependencies {
implementation(project(":newrelic-security-api"))
implementation("com.newrelic.agent.java:newrelic-api:${nrAPIVersion}")
implementation("com.newrelic.agent.java:newrelic-weaver-api:${nrAPIVersion}")
implementation("com.newrelic.agent.java:agent-bridge:${nrAPIVersion}")
implementation("org.apache.pekko:pekko-http_2.13:1.0.1")
implementation("org.apache.pekko:pekko-http-core_2.13:1.0.1")
implementation("org.apache.pekko:pekko-stream_2.13:1.0.1")
implementation("org.apache.pekko:pekko-actor_2.13:1.0.1")
}

verifyInstrumentation {
passesOnly('org.apache.pekko:pekko-http_2.13:[1.0.0,)') {
implementation("org.apache.pekko:pekko-stream_2.13:1.0.0")
}
excludeRegex 'org.apache.pekko:pekko-http_2.13:.*(RC|M)[0-9]*$'
excludeRegex 'org.apache.pekko:pekko-http_2.13:.*-[0-9a-f]{8}$'
}

site {
title 'Pekko Http Core'
type 'Framework'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.pekko.http.scaladsl

import com.newrelic.api.agent.{NewRelic, Trace}
import org.apache.pekko.Done
import org.apache.pekko.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.javadsl.Source
import org.apache.pekko.util.ByteString

import java.lang
import scala.concurrent.{ExecutionContext, Future}
import scala.runtime.AbstractFunction1

class AsyncRequestHandler(handler: HttpRequest => Future[HttpResponse])(implicit ec: ExecutionContext, materializer: Materializer) extends AbstractFunction1[HttpRequest, Future[HttpResponse]] {

@Trace(dispatcher = true)
override def apply(param: HttpRequest): Future[HttpResponse] = {
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAcquired = PekkoCoreUtils.acquireServletLockIfPossible();

if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
PekkoCoreUtils.preProcessHttpRequest(isLockAcquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val futureResponse: Future[HttpResponse] = handler.apply(param)
futureResponse.flatMap(ResponseFutureHelper.wrapResponseAsync(NewRelic.getAgent.getTransaction.getToken, materializer))
futureResponse
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package org.apache.pekko.http.scaladsl;

import com.newrelic.api.agent.security.NewRelicSecurity;
import com.newrelic.api.agent.security.instrumentation.helpers.GenericHelper;
import com.newrelic.api.agent.security.instrumentation.helpers.ServletHelper;
import com.newrelic.api.agent.security.schema.AbstractOperation;
import com.newrelic.api.agent.security.schema.SecurityMetaData;
import com.newrelic.api.agent.security.schema.StringUtils;
import com.newrelic.api.agent.security.schema.exceptions.NewRelicSecurityException;
import com.newrelic.api.agent.security.schema.operation.SSRFOperation;
import com.newrelic.api.agent.security.utils.SSRFUtils;
import com.newrelic.api.agent.security.utils.logging.LogLevel;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.http.scaladsl.model.headers.RawHeader;
import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.pekko.http.scaladsl.settings.ServerSettings;
import org.apache.pekko.stream.Materializer;
import scala.Function1;
import scala.concurrent.Future;

import java.net.URI;

@Weave(type = MatchType.ExactClass, originalName = "org.apache.pekko.http.scaladsl.HttpExt")
public class HttpExt_Instrumentation {

// These methods are deprecated but still exist in Pekko Http Core 1.0.0.
// They have been replaced by Http().newServerAt().bind().

public Future<Http.ServerBinding> bindAndHandleAsync(
Function1<HttpRequest, Future<HttpResponse>> handler,
String interfaceString, int port,
ConnectionContext connectionContext,
ServerSettings settings, int parallelism,
LoggingAdapter adapter, Materializer mat) {

AsyncRequestHandler wrapperHandler = new AsyncRequestHandler(handler, mat.executionContext(), mat);
handler = wrapperHandler;
return Weaver.callOriginal();
}

public Future<Http.ServerBinding> bindAndHandleSync(
Function1<HttpRequest, HttpResponse> handler,
String interfaceString, int port,
ConnectionContext connectionContext,
ServerSettings settings,
LoggingAdapter adapter, Materializer mat) {

SyncRequestHandler wrapperHandler = new SyncRequestHandler(handler, mat);
handler = wrapperHandler;
return Weaver.callOriginal();
}

public Future<HttpResponse> singleRequest(HttpRequest httpRequest, HttpsConnectionContext connectionContext, ConnectionPoolSettings poolSettings, LoggingAdapter loggingAdapter) {

boolean isLockAcquired = GenericHelper.acquireLockIfPossible(PekkoCoreUtils.NR_SEC_CUSTOM_ATTRIB_OUTBOUND_REQ);
AbstractOperation operation = null;

SecurityMetaData securityMetaData = NewRelicSecurity.getAgent().getSecurityMetaData();
if (isLockAcquired) {
operation = preprocessSecurityHook(httpRequest, PekkoCoreUtils.METHOD_SINGLE_REQUEST);
}

if (operation!=null) {
// Add CSEC Fuzz and parent headers
String iastHeader = NewRelicSecurity.getAgent().getSecurityMetaData().getFuzzRequestIdentifier().getRaw();
if (iastHeader != null && !iastHeader.trim().isEmpty()) {
httpRequest = (HttpRequest)httpRequest.addHeader(RawHeader.apply(ServletHelper.CSEC_IAST_FUZZ_REQUEST_ID, iastHeader));
}

String csecParaentId = securityMetaData.getCustomAttribute(GenericHelper.CSEC_PARENT_ID, String.class);
if(StringUtils.isNotBlank(csecParaentId)){
httpRequest = (HttpRequest)httpRequest.addHeader(RawHeader.apply(GenericHelper.CSEC_PARENT_ID, csecParaentId));
}

try {
NewRelicSecurity.getAgent().registerOperation(operation);
} catch (Exception e) {
NewRelicSecurity.getAgent().log(LogLevel.SEVERE, String.format(GenericHelper.REGISTER_OPERATION_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, e.getMessage()), e, this.getClass().getName());
NewRelicSecurity.getAgent().reportIncident(LogLevel.SEVERE , String.format(GenericHelper.REGISTER_OPERATION_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, e.getMessage()), e, this.getClass().getName());
} finally {
if (operation.getApiID() != null && !operation.getApiID().trim().isEmpty() &&
operation.getExecutionId() != null && !operation.getExecutionId().trim().isEmpty()) {
// Add CSEC distributed tracing header
httpRequest = (HttpRequest)httpRequest.addHeader(RawHeader.apply(ServletHelper.CSEC_DISTRIBUTED_TRACING_HEADER,
SSRFUtils.generateTracingHeaderValue(securityMetaData.getTracingHeaderValue(), operation.getApiID(), operation.getExecutionId(),
NewRelicSecurity.getAgent().getAgentUUID())));
}
}
}

Future<HttpResponse> returnCode;
// Actual Call
try {
returnCode = Weaver.callOriginal();
} finally {
if (isLockAcquired) {
GenericHelper.releaseLock(PekkoCoreUtils.NR_SEC_CUSTOM_ATTRIB_OUTBOUND_REQ);
}
}
registerExitOperation(isLockAcquired, operation);
return returnCode;
}

private void registerExitOperation(boolean isProcessingAllowed, AbstractOperation operation) {
try {
if (operation == null || !isProcessingAllowed || !NewRelicSecurity.isHookProcessingActive() || NewRelicSecurity.getAgent().getSecurityMetaData().getRequest().isEmpty()
) {
return;
}
NewRelicSecurity.getAgent().registerExitEvent(operation);
} catch (Throwable ignored) {
NewRelicSecurity.getAgent().log(LogLevel.FINEST, String.format(GenericHelper.EXIT_OPERATION_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, ignored.getMessage()), ignored, HttpExt_Instrumentation.class.getName());
}
}

private AbstractOperation preprocessSecurityHook(HttpRequest httpRequest, String methodName) {
try {
SecurityMetaData securityMetaData = NewRelicSecurity.getAgent().getSecurityMetaData();
if (!NewRelicSecurity.isHookProcessingActive() || securityMetaData.getRequest().isEmpty()) {
return null;
}

// Generate required URL
String uri = null;
try {
URI methodURI = new URI(httpRequest.getUri().toString());
uri = methodURI.toString();
if (methodURI == null) {
return null;
}
} catch (Exception ignored){
NewRelicSecurity.getAgent().log(LogLevel.WARNING, String.format(GenericHelper.URI_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, ignored.getMessage()), ignored, this.getClass().getName());
return null;
}

return new SSRFOperation(uri, this.getClass().getName(), methodName);
} catch (Throwable e) {
if (e instanceof NewRelicSecurityException) {
NewRelicSecurity.getAgent().log(LogLevel.WARNING, String.format(GenericHelper.SECURITY_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, e.getMessage()), e, this.getClass().getName());
throw e;
}
NewRelicSecurity.getAgent().log(LogLevel.SEVERE, String.format(GenericHelper.REGISTER_OPERATION_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, e.getMessage()), e, this.getClass().getName());
NewRelicSecurity.getAgent().reportIncident(LogLevel.SEVERE , String.format(GenericHelper.REGISTER_OPERATION_EXCEPTION_MESSAGE, PekkoCoreUtils.PEKKO_HTTP_CORE_2_13_1, e.getMessage()), e, this.getClass().getName());
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.pekko.http.scaladsl;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.security.NewRelicSecurity;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;

import java.net.InetSocketAddress;
import java.util.logging.Level;

@Weave(type = MatchType.ExactClass, originalName = "org.apache.pekko.http.scaladsl.Http")
public class Http_Instrumentation {

@Weave(type = MatchType.ExactClass, originalName = "org.apache.pekko.http.scaladsl.Http$ServerBinding")
public static class ServerBinding {

public InetSocketAddress localAddress() {
return Weaver.callOriginal();
}

@WeaveAllConstructors
public ServerBinding() {
NewRelicSecurity.getAgent().setApplicationConnectionConfig(localAddress().getPort(), "http");
try {
AgentBridge.instrumentation.retransformUninstrumentedClass(SyncRequestHandler.class);
AgentBridge.instrumentation.retransformUninstrumentedClass(AsyncRequestHandler.class);
} catch (Throwable e) {
NewRelic.getAgent().getLogger().log(Level.SEVERE, "Unable to instrument com.newrelic.instrumentation.security.apache-pekko-http-core-2.13_1 due to error", e);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.pekko.http.scaladsl;

import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.stream.Materializer;
import scala.Function1;
import scala.concurrent.Future;

@Weave(originalName = "org.apache.pekko.http.scaladsl.Http$IncomingConnection")
public class IncomingConnection_Instrumentation {

public void handleWithSyncHandler(Function1<HttpRequest, HttpResponse> func, Materializer mat) {
SyncRequestHandler wrapperHandler = new SyncRequestHandler(func, mat);
func = wrapperHandler;
Weaver.callOriginal();
}

public void handleWithAsyncHandler(Function1<HttpRequest, Future<HttpResponse>> func, int parallel, Materializer mat) {
AsyncRequestHandler wrapperHandler = new AsyncRequestHandler(func, mat.executionContext(), mat);
func = wrapperHandler;
Weaver.callOriginal();
}
}
Loading
Loading