Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Release Notes.
------------------

* Add the virtual thread executor plugin

* Fix Conflicts apm-jdk-threadpool-plugin conflicts with apm-jdk-forkjoinpool-plugin

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/236?closed=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import java.util.concurrent.ForkJoinTask;

public abstract class AbstractThreadingPoolInterceptor implements InstanceMethodsAroundInterceptor {
@Override
Expand Down Expand Up @@ -71,6 +72,8 @@ private boolean notToEnhance(Object[] allArguments) {
Object argument = allArguments[0];

// Avoid duplicate enhancement, such as the case where it has already been enhanced by RunnableWrapper or CallableWrapper with toolkit.
return argument instanceof EnhancedInstance && ((EnhancedInstance) argument).getSkyWalkingDynamicField() instanceof ContextSnapshot;
return argument instanceof EnhancedInstance
&& ((EnhancedInstance) argument).getSkyWalkingDynamicField() instanceof ContextSnapshot
&& !(argument instanceof ForkJoinTask);
Copy link
Member

@wu-sheng wu-sheng Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When argument is ForkJoinTask, it will be repeatedly enhanced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.
Because forkjoinpool plugin enhanced is not work in threadpool plugin.
Also threadpool plugin enhanced is not work in forkjoinpool plugin.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember when the previous two conditions matched, did you verify that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when forkjoinpool plugin enabled, it is matched

image

org.apache.skywalking.apm.plugin.jdk.forkjoinpool.ForkJoinTaskConstructorInterceptor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this plugin doesn't consider this kind of compatibility AFAIK. But still, these codes exist. I want to understand why to make sure the new added conditions don't impact existing features.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember when the previous two conditions matched, did you verify that?我不记得前两个条件何时匹配,你验证过吗?

this is verify code and result. It looks normal

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.*;

@Slf4j
@RestController
@RequestMapping(value = "/test")
public class TestController {

    ExecutorService executorService = Executors.newFixedThreadPool(10);

    @PutMapping("/test1")
    public void test() {
        executorService.submit(() -> {
            log.info("submit");
        });
        executorService.submit(new MyForkJoinTask(() -> {
            log.info("继承ForkJoinTask");
        }));
        CompletableFuture.runAsync(() -> {
            log.info("CompletableFuture底层的AsyncRun继承ForkJoinTask");
        }, executorService);
        ForkJoinPool.commonPool().execute((Runnable) new MyForkJoinTask(() -> {
            log.info("Runnable: 继承ForkJoinTask");
        }));
        ForkJoinPool.commonPool().execute((ForkJoinTask<Void>) new MyForkJoinTask(() -> {
            log.info("ForkJoinTask: 继承ForkJoinTask");
        }));
    }

    static final class MyForkJoinTask extends ForkJoinTask<Void> implements Runnable {
        CompletableFuture<Void> dep;
        Runnable fn;

        MyForkJoinTask(Runnable fn) {
            this.dep = new CompletableFuture<Void>();
            ;
            this.fn = fn;
        }

        public final Void getRawResult() {
            return null;
        }

        public final void setRawResult(Void v) {
        }

        public final boolean exec() {
            run();
            return false;
        }

        public void run() {
            CompletableFuture<Void> d;
            Runnable f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null;
                fn = null;
                if (!d.isDone()) {
                    try {
                        f.run();
                        d.complete(null);
                    } catch (Throwable ex) {
                        d.complete(null);
                    }
                }
            }
        }
    }

}

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I don't mean your codes are verified. I want to know why the method #notToEnhance existed before.

}
}
Loading