Skip to content

Commit

Permalink
Add kamon-pekko-http (#1285)
Browse files Browse the repository at this point in the history
* Add kamon-pekko-http module for Apache Pekko HTTP support

* Rework deprecated methods

* Remove pekko-http2-support module

see apache/pekko-http#52

* Remove Akka 10.1 references

* Set pekko-http 1.0.0-RC2

* Use Pekko HTTP release version
  • Loading branch information
DieBauer committed Aug 10, 2023
1 parent ae8e4a6 commit 14cba52
Show file tree
Hide file tree
Showing 29 changed files with 2,727 additions and 0 deletions.
24 changes: 24 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http")
)).dependsOn(`kamon-akka`, `kamon-testkit` % "test")



lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
Expand All @@ -496,6 +497,29 @@ lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
`kamon-scala-future` % "compile",
`kamon-testkit` % "test"
)

lazy val pekkoHttpVersion = "1.0.0"

lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided",
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided",
scalatest % "test",
slf4jApi % "test",
slf4jnop % "test",
okHttp % "test",
"org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % "test",
"com.github.pjfanning" %% "pekko-http-json4s" % "2.0.0" % "test",
"org.json4s" %% "json4s-native" % "4.0.6" % "test",
),
)).dependsOn(`kamon-pekko`, `kamon-testkit` % "test")

lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
.disablePlugins(AssemblyPlugin)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kamon.instrumentation.pekko.http;

import org.apache.pekko.NotUsed;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.stream.scaladsl.Flow;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class FlowOpsMapAsyncAdvice {

public static class EndpointInfo {
public final String listenInterface;
public final int listenPort;

public EndpointInfo(String listenInterface, int listenPort) {
this.listenInterface = listenInterface;
this.listenPort = listenPort;
}
}

public static ThreadLocal<EndpointInfo> currentEndpoint = new ThreadLocal<>();

@Advice.OnMethodExit
public static void onExit(@Advice.Return(readOnly = false) org.apache.pekko.stream.scaladsl.FlowOps returnedFlow) {
EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get();

if(bindAndHandlerEndpoint != null) {
returnedFlow = ServerFlowWrapper.apply(
(Flow<HttpRequest, HttpResponse, NotUsed>) returnedFlow,
bindAndHandlerEndpoint.listenInterface,
bindAndHandlerEndpoint.listenPort
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.http;

import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.Function1;
import scala.concurrent.Future;

public class Http2ExtBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Function1<HttpRequest, Future<HttpResponse>> handler,
@Advice.Argument(1) String iface,
@Advice.Argument(2) Integer port) {

FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port));
handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler);
}

@Advice.OnMethodExit
public static void onExit() {
FlowOpsMapAsyncAdvice.currentEndpoint.remove();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.http;

import org.apache.pekko.NotUsed;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.stream.scaladsl.Flow;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class HttpExtBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, NotUsed> handler,
@Advice.Argument(1) String iface,
@Advice.Argument(2) Integer port) {

handler = ServerFlowWrapper.apply(handler, iface, port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.http;

import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import kamon.Kamon;
import kamon.context.Storage;
import kamon.instrumentation.http.HttpClientInstrumentation;
import kamon.instrumentation.http.HttpMessage;
import kamon.trace.Span;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.concurrent.Future;
import static kamon.instrumentation.pekko.http.PekkoHttpInstrumentation.toRequestBuilder;

public class HttpExtSingleRequestAdvice {

@Advice.OnMethodEnter
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) HttpRequest request,
@Advice.Local("handler") HttpClientInstrumentation.RequestHandler<HttpRequest> handler,
@Advice.Local("scope")Storage.Scope scope) {

final HttpMessage.RequestBuilder<HttpRequest> requestBuilder = toRequestBuilder(request);

handler = PekkoHttpClientInstrumentation.httpClientInstrumentation()
.createHandler(requestBuilder, Kamon.currentContext());

request = handler.request();
scope = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key(), handler.span()));
}

@Advice.OnMethodExit
public static void onExit(@Advice.Return Future<HttpResponse> response,
@Advice.Local("handler") HttpClientInstrumentation.RequestHandler<HttpRequest> handler,
@Advice.Local("scope")Storage.Scope scope) {

PekkoHttpClientInstrumentation.handleResponse(response, handler);
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.http;

import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import kamon.Kamon;
import kamon.context.Storage;
import kamon.instrumentation.http.HttpClientInstrumentation;
import kamon.instrumentation.http.HttpMessage;
import kamon.trace.Span;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.concurrent.Future;

import static kamon.instrumentation.pekko.http.PekkoHttpInstrumentation.toRequestBuilder;

public class PoolMasterDispatchRequestAdvice {

@Advice.OnMethodEnter
public static void onEnter(
@Advice.Argument(value = 1, readOnly = false) HttpRequest request,
@Advice.Local("handler") HttpClientInstrumentation.RequestHandler<HttpRequest> handler,
@Advice.Local("scope")Storage.Scope scope) {

final HttpMessage.RequestBuilder<HttpRequest> requestBuilder = toRequestBuilder(request);

handler = PekkoHttpClientInstrumentation.httpClientInstrumentation()
.createHandler(requestBuilder, Kamon.currentContext());

request = handler.request();
scope = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key(), handler.span()));
}

@Advice.OnMethodExit
public static void onExit(
@Advice.Return Future<HttpResponse> response,
@Advice.Local("handler") HttpClientInstrumentation.RequestHandler<HttpRequest> handler,
@Advice.Local("scope")Storage.Scope scope) {

PekkoHttpClientInstrumentation.handleResponse(response, handler);
scope.close();
}
}
Loading

0 comments on commit 14cba52

Please sign in to comment.