Skip to content

Commit

Permalink
Fix concurrency limiter issue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhiguo committed Feb 5, 2025
1 parent 064d386 commit 83ef8ef
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public boolean acquire() {
* @return true if the acquisition is successful, false otherwise.
*/
protected abstract boolean doAcquire();

@Override
public void complete() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@
*/
public interface ConcurrencyLimiter extends Licensee<ConcurrencyLimitPolicy> {


void complete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.jd.live.agent.governance.policy.service.limit.ConcurrencyLimitPolicy;
import com.jd.live.agent.governance.request.ServiceRequest.InboundRequest;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void initialize() {
public <T extends InboundRequest> CompletionStage<Object> filter(InboundInvocation<T> invocation, InboundFilterChain chain) {
ServicePolicy servicePolicy = invocation.getServiceMetadata().getServicePolicy();
List<ConcurrencyLimitPolicy> concurrencyLimitPolicies = servicePolicy == null ? null : servicePolicy.getConcurrencyLimitPolicies();
List<ConcurrencyLimiter> limiters = new ArrayList<>();
if (null != concurrencyLimitPolicies && !concurrencyLimitPolicies.isEmpty()) {
for (ConcurrencyLimitPolicy policy : concurrencyLimitPolicies) {
// match logic
Expand All @@ -75,10 +77,17 @@ public <T extends InboundRequest> CompletionStage<Object> filter(InboundInvocati
"The request is rejected by concurrency limiter. maxConcurrency=" +
policy.getMaxConcurrency());
}
if (limiter != null) {
limiters.add(limiter);
}
}
}
}
return chain.filter(invocation);
return chain.filter(invocation).whenComplete((o, throwable) -> {
for (ConcurrencyLimiter limiter : limiters) {
limiter.complete();
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public Resilience4jConcurrencyLimiter(ConcurrencyLimitPolicy policy) {
public boolean doAcquire() {
return bulkhead.tryAcquirePermission();
}

@Override
public void complete() {
bulkhead.onComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* @since 1.6.0
*/
@Injectable
@Extension(value = "BodyInserterRequestDefinition", order = PluginDefinition.ORDER_TRANSMISSION)
@Extension(value = "BodyInserterRequestDefinition_v5", order = PluginDefinition.ORDER_TRANSMISSION)
@ConditionalOnTransmissionEnabled
@ConditionalOnClass(BodyInserterRequestDefinition.TYPE_BODY_INSERTER_QUEST)
public class BodyInserterRequestDefinition extends PluginDefinitionAdapter {
Expand Down

0 comments on commit 83ef8ef

Please sign in to comment.