Skip to content

Commit 1a34c5e

Browse files
authored
Merge branch 'main' into za20241010
2 parents 17b7423 + f024586 commit 1a34c5e

File tree

21 files changed

+911
-4
lines changed

21 files changed

+911
-4
lines changed

CHANGES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ Release Notes.
88
* Upgrade nats plugin to support 2.16.5
99
* Add agent self-observability.
1010
* Fix intermittent ClassCircularityError by preloading ThreadLocalRandom since ByteBuddy 1.12.11
11+
* Add witness class/method for resteasy-server plugin(v3/v4/v6)
12+
* Add async-profiler feature for performance analysis
1113
* Upgrade grpc-protobuf to 1.68.1
1214

13-
1415
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1)
1516

1617
------------------
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.network.trace.component.command;
20+
21+
import org.apache.skywalking.apm.network.common.v3.Command;
22+
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
23+
24+
import java.util.List;
25+
import java.util.Objects;
26+
27+
public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable<AsyncProfilerTaskCommand> {
28+
public static final Deserializable<AsyncProfilerTaskCommand> DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0);
29+
public static final String NAME = "AsyncProfilerTaskQuery";
30+
31+
/**
32+
* async-profiler taskId
33+
*/
34+
private final String taskId;
35+
/**
36+
* run profiling for duration (second)
37+
*/
38+
private final int duration;
39+
/**
40+
* async profiler extended parameters. Here is a table of optional parameters.
41+
*
42+
* <p>lock[=DURATION] - profile contended locks overflowing the DURATION ns bucket (default: 10us)</p>
43+
* <p>alloc[=BYTES] - profile allocations with BYTES interval</p>
44+
* <p>interval=N - sampling interval in ns (default: 10'000'000, i.e. 10 ms)</p>
45+
* <p>jstackdepth=N - maximum Java stack depth (default: 2048)</p>
46+
* <p>chunksize=N - approximate size of JFR chunk in bytes (default: 100 MB) </p>
47+
* <p>chunktime=N - duration of JFR chunk in seconds (default: 1 hour) </p>
48+
* details @see <a href="https://github.com/async-profiler/async-profiler/blob/master/src/arguments.cpp#L44">async-profiler argument</a>
49+
*/
50+
private final String execArgs;
51+
/**
52+
* task create time
53+
*/
54+
private final long createTime;
55+
56+
public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
57+
List<String> events, String execArgs, long createTime) {
58+
super(NAME, serialNumber);
59+
this.taskId = taskId;
60+
this.duration = duration;
61+
this.createTime = createTime;
62+
String comma = ",";
63+
StringBuilder sb = new StringBuilder();
64+
if (Objects.nonNull(events) && !events.isEmpty()) {
65+
sb.append("event=")
66+
.append(String.join(comma, events))
67+
.append(comma);
68+
}
69+
if (execArgs != null && !execArgs.isEmpty()) {
70+
sb.append(execArgs);
71+
}
72+
this.execArgs = sb.toString();
73+
}
74+
75+
public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
76+
String execArgs, long createTime) {
77+
super(NAME, serialNumber);
78+
this.taskId = taskId;
79+
this.duration = duration;
80+
this.execArgs = execArgs;
81+
this.createTime = createTime;
82+
}
83+
84+
@Override
85+
public AsyncProfilerTaskCommand deserialize(Command command) {
86+
final List<KeyStringValuePair> argsList = command.getArgsList();
87+
String taskId = null;
88+
int duration = 0;
89+
String execArgs = null;
90+
long createTime = 0;
91+
String serialNumber = null;
92+
for (final KeyStringValuePair pair : argsList) {
93+
if ("SerialNumber".equals(pair.getKey())) {
94+
serialNumber = pair.getValue();
95+
} else if ("TaskId".equals(pair.getKey())) {
96+
taskId = pair.getValue();
97+
} else if ("Duration".equals(pair.getKey())) {
98+
duration = Integer.parseInt(pair.getValue());
99+
} else if ("ExecArgs".equals(pair.getKey())) {
100+
execArgs = pair.getValue();
101+
} else if ("CreateTime".equals(pair.getKey())) {
102+
createTime = Long.parseLong(pair.getValue());
103+
}
104+
}
105+
return new AsyncProfilerTaskCommand(serialNumber, taskId, duration, execArgs, createTime);
106+
}
107+
108+
@Override
109+
public Command.Builder serialize() {
110+
final Command.Builder builder = commandBuilder();
111+
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
112+
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
113+
.addArgs(KeyStringValuePair.newBuilder().setKey("ExecArgs").setValue(execArgs))
114+
.addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime)));
115+
return builder;
116+
}
117+
118+
public String getTaskId() {
119+
return taskId;
120+
}
121+
122+
public int getDuration() {
123+
return duration;
124+
}
125+
126+
public String getExecArgs() {
127+
return execArgs;
128+
}
129+
130+
public long getCreateTime() {
131+
return createTime;
132+
}
133+
}

apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public static BaseCommand deserialize(final Command command) {
2727
return ProfileTaskCommand.DESERIALIZER.deserialize(command);
2828
} else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) {
2929
return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command);
30+
} else if (AsyncProfilerTaskCommand.NAME.equals(commandName)) {
31+
return AsyncProfilerTaskCommand.DESERIALIZER.deserialize(command);
3032
}
33+
3134
throw new UnsupportedCommandException(command);
3235
}
3336

apm-sniffer/apm-agent-core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
<shade.org.slf4j.target>${shade.package}.${shade.org.slf4j.source}</shade.org.slf4j.target>
5454
<ststem-rules.version>1.18.0</ststem-rules.version>
5555
<slf4j.version>1.7.25</slf4j.version>
56+
<shade.one.profiler.source>one.profiler</shade.one.profiler.source>
57+
<shade.one.profiler.target>${shade.package}.${shade.one.profiler.source}</shade.one.profiler.target>
5658
</properties>
5759

5860
<dependencies>
@@ -143,6 +145,10 @@
143145
<artifactId>jmh-generator-annprocess</artifactId>
144146
<scope>test</scope>
145147
</dependency>
148+
<dependency>
149+
<groupId>tools.profiler</groupId>
150+
<artifactId>async-profiler</artifactId>
151+
</dependency>
146152
</dependencies>
147153
<dependencyManagement>
148154
<dependencies>
@@ -249,6 +255,10 @@
249255
<pattern>${shade.org.slf4j.source}</pattern>
250256
<shadedPattern>${shade.org.slf4j.target}</shadedPattern>
251257
</relocation>
258+
<relocation>
259+
<pattern>${shade.one.profiler.source}</pattern>
260+
<shadedPattern>${shade.one.profiler.target}</shadedPattern>
261+
</relocation>
252262
</relocations>
253263
<filters>
254264
<filter>
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.agent.core.asyncprofiler;
20+
21+
import com.google.protobuf.ByteString;
22+
import io.grpc.Channel;
23+
import io.grpc.stub.ClientCallStreamObserver;
24+
import io.grpc.stub.ClientResponseObserver;
25+
import io.grpc.stub.StreamObserver;
26+
import org.apache.skywalking.apm.agent.core.boot.BootService;
27+
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
28+
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
29+
import org.apache.skywalking.apm.agent.core.conf.Config;
30+
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
31+
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
32+
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
33+
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
34+
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
35+
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
36+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectionResponse;
37+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData;
38+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData;
39+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc;
40+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus;
41+
42+
import java.io.File;
43+
import java.io.FileInputStream;
44+
import java.io.IOException;
45+
import java.nio.file.Files;
46+
import java.util.concurrent.TimeUnit;
47+
48+
import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE;
49+
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
50+
51+
@DefaultImplementor
52+
public class AsyncProfilerDataSender implements BootService, GRPCChannelListener {
53+
private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerDataSender.class);
54+
55+
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
56+
57+
private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskStub asyncProfilerTaskStub;
58+
59+
@Override
60+
public void prepare() throws Throwable {
61+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
62+
}
63+
64+
@Override
65+
public void boot() throws Throwable {
66+
67+
}
68+
69+
@Override
70+
public void onComplete() throws Throwable {
71+
72+
}
73+
74+
@Override
75+
public void shutdown() throws Throwable {
76+
77+
}
78+
79+
@Override
80+
public void statusChanged(GRPCChannelStatus status) {
81+
if (GRPCChannelStatus.CONNECTED.equals(status)) {
82+
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
83+
asyncProfilerTaskStub = AsyncProfilerTaskGrpc.newStub(channel);
84+
} else {
85+
asyncProfilerTaskStub = null;
86+
}
87+
this.status = status;
88+
}
89+
90+
public void sendData(AsyncProfilerTask task, File dumpFile) throws IOException, InterruptedException {
91+
if (status != GRPCChannelStatus.CONNECTED) {
92+
return;
93+
}
94+
95+
try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) {
96+
long fileSize = Files.size(dumpFile.toPath());
97+
int size = Math.toIntExact(fileSize);
98+
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
99+
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
100+
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
101+
).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() {
102+
ClientCallStreamObserver<AsyncProfilerData> requestStream;
103+
104+
@Override
105+
public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
106+
this.requestStream = requestStream;
107+
}
108+
109+
@Override
110+
public void onNext(AsyncProfilerCollectionResponse value) {
111+
if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {
112+
LOGGER.warn("JFR is too large to be received by the oap server");
113+
} else {
114+
byte[] buf = new byte[DATA_CHUNK_SIZE];
115+
try {
116+
int bytesRead;
117+
while ((bytesRead = fileInputStream.read(buf)) != -1) {
118+
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
119+
.setContent(ByteString.copyFrom(buf, 0, bytesRead))
120+
.build();
121+
requestStream.onNext(asyncProfilerData);
122+
}
123+
} catch (IOException e) {
124+
LOGGER.error("Failed to read JFR file and failed to upload to oap", e);
125+
}
126+
}
127+
128+
requestStream.onCompleted();
129+
}
130+
131+
@Override
132+
public void onError(Throwable t) {
133+
status.finished();
134+
LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception.");
135+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
136+
}
137+
138+
@Override
139+
public void onCompleted() {
140+
status.finished();
141+
}
142+
});
143+
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
144+
.setService(Config.Agent.SERVICE_NAME)
145+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
146+
.setType(AsyncProfilingStatus.PROFILING_SUCCESS)
147+
.setContentSize(size)
148+
.setTaskId(task.getTaskId())
149+
.build();
150+
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build();
151+
dataStreamObserver.onNext(asyncProfilerData);
152+
153+
status.wait4Finish();
154+
}
155+
}
156+
157+
public void sendError(AsyncProfilerTask task, String errorMessage) {
158+
if (status != GRPCChannelStatus.CONNECTED) {
159+
return;
160+
}
161+
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
162+
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
163+
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
164+
).collect(new StreamObserver<AsyncProfilerCollectionResponse>() {
165+
@Override
166+
public void onNext(AsyncProfilerCollectionResponse value) {
167+
}
168+
169+
@Override
170+
public void onError(Throwable t) {
171+
status.finished();
172+
LOGGER.error(t, "Send async profiler task execute error fail with a grpc internal exception.");
173+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
174+
}
175+
176+
@Override
177+
public void onCompleted() {
178+
status.finished();
179+
}
180+
});
181+
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
182+
.setService(Config.Agent.SERVICE_NAME)
183+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
184+
.setTaskId(task.getTaskId())
185+
.setType(AsyncProfilingStatus.EXECUTION_TASK_ERROR)
186+
.setContentSize(-1)
187+
.build();
188+
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
189+
.setMetaData(metaData)
190+
.setErrorMessage(errorMessage)
191+
.build();
192+
dataStreamObserver.onNext(asyncProfilerData);
193+
dataStreamObserver.onCompleted();
194+
status.wait4Finish();
195+
}
196+
}

0 commit comments

Comments
 (0)