Skip to content

Commit

Permalink
Merge pull request #215
Browse files Browse the repository at this point in the history
214-fix-the-issue-of-slow-consumer-awareness-of-instance-logic-going-offline-in-grpc-calls
  • Loading branch information
chenzhiguo authored Jan 27, 2025
2 parents c6e0b19 + 776027c commit d3085d2
Show file tree
Hide file tree
Showing 75 changed files with 914 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.jd.live.agent.core.bootstrap;

import java.util.Map;
import java.util.concurrent.Callable;

/**
* Defines the lifecycle operations for an agent component within a system.
Expand Down Expand Up @@ -54,13 +53,4 @@ public interface AgentLifecycle {
*/
void execute(String command, Map<String, Object> args);

/**
* Adds a hook that will be executed when the system is ready.
* This can be used to perform initialization tasks or other operations
* that should occur once the system has completed its startup process.
*
* @param callable the hook to be executed when the system is ready
* @param classLoader execute in this classloader
*/
void addReadyHook(Callable<?> callable, ClassLoader classLoader);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.bootstrap;

/**
* An interface that supplies an AppListener instance.
*/
public interface AppListenerSupplier {

/**
* Returns an AppListener instance.
*
* @return an AppListener instance
*/
AppListener getAppListener();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public interface Publisher<E> {
*/
String POLICY_SUBSCRIBER = "policy-subscriber";

/**
* Topic identifier for endpoint events.
*/
String ENDPOINT = "endpoint";

/**
* Retrieves the topic associated with this publisher.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ public static <T, V> List<V> toList(Iterable<T> iterable, Function<T, V> functio
}
List<V> result = iterable instanceof Collection ? new ArrayList<>(((Collection<T>) iterable).size()) : new ArrayList<>();
for (T t : iterable) {
result.add(function.apply(t));
V apply = function.apply(t);
if (apply != null) {
result.add(apply);
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,8 @@
import java.io.FileReader;
import java.io.Reader;
import java.lang.instrument.Instrumentation;
import java.lang.reflect.InvocationTargetException;
import java.security.CodeSource;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.jd.live.agent.core.extension.condition.ConditionMatcher.DEPEND_ON_LOADER;

Expand Down Expand Up @@ -223,8 +220,6 @@ public class Bootstrap implements AgentLifecycle {
*/
private List<InjectSourceSupplier> sourceSuppliers;

private final List<Callable<?>> readies = new CopyOnWriteArrayList<>();

private Shutdown shutdown;

/**
Expand Down Expand Up @@ -365,21 +360,6 @@ public void execute(String command, Map<String, Object> args) {
}
}

@Override
public void addReadyHook(Callable<?> callable, ClassLoader classLoader) {
if (callable != null) {
readies.add(() -> {
ClassLoader old = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
try {
return callable.call();
} finally {
Thread.currentThread().setContextClassLoader(old);
}
});
}
}

private void printExtensions() {
StringBuilder builder = new StringBuilder();
extensionManager.forEach(extensible -> {
Expand Down Expand Up @@ -576,7 +556,14 @@ private void setupServiceManager() {
}

private List<AppListener> createApplicationListeners() {
return extensionManager.getOrLoadExtensible(AppListener.class, classLoaderManager.getCoreImplLoader()).getExtensions();
List<AppListener> extensions = extensionManager.getOrLoadExtensible(AppListener.class, classLoaderManager.getCoreImplLoader()).getExtensions();
List<AppListener> result = new ArrayList<>(extensions);
serviceManager.service(service -> {
if (service instanceof AppListenerSupplier) {
result.add(((AppListenerSupplier) service).getAppListener());
}
});
return result;
}

private PluginSupervisor createPluginManager() {
Expand Down Expand Up @@ -715,7 +702,6 @@ private void onAgentEvent(AgentEvent event) {
application.setStatus(AppStatus.STARTED);
break;
case APPLICATION_READY:
onReady();
application.setStatus(AppStatus.READY);
break;
case APPLICATION_STOP:
Expand All @@ -724,24 +710,6 @@ private void onAgentEvent(AgentEvent event) {
}
}

/**
* Executes all registered ready hooks.
* Runs each runnable and logs exceptions if any occur during execution.
*/
private void onReady() {
// Some framework does not support multi thread to registration
for (Callable<?> runnable : readies) {
try {
runnable.call();
} catch (Throwable e) {
Throwable cause = e instanceof InvocationTargetException ? e.getCause() : null;
cause = cause != null ? cause : e;
onException(cause.getMessage(), cause);
}
readies.clear();
}
}

/**
* Logs an exception and optionally shuts down the application if not in dynamic mode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,15 @@ public boolean isSystem(String path) {
return path != null && systemPaths != null && !systemPaths.isEmpty()
&& systemPathTrie.match(path, PathMatchType.PREFIX) != null;
}

/**
* Returns the group associated with the specified service.
*
* @param service the name of the service
* @return the group associated with the service, or null if the service is not found or if the service groups map is null
*/
public String getGroup(String service) {
return serviceGroups == null || service == null ? null : serviceGroups.get(service);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.bytekit.context.MethodContext;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.bootstrap.AgentLifecycle;
import com.jd.live.agent.core.instance.AppStatus;
import com.jd.live.agent.core.instance.Application;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
Expand All @@ -31,17 +28,12 @@
*/
public abstract class AbstractRegistryInterceptor extends InterceptorAdaptor {

private final Logger logger = LoggerFactory.getLogger(AbstractRegistryInterceptor.class);

protected final Application application;

protected final AgentLifecycle lifecycle;

protected final Registry registry;

public AbstractRegistryInterceptor(Application application, AgentLifecycle lifecycle, Registry registry) {
public AbstractRegistryInterceptor(Application application, Registry registry) {
this.application = application;
this.lifecycle = lifecycle;
this.registry = registry;
}

Expand All @@ -51,12 +43,10 @@ public void onEnter(ExecutableContext ctx) {
if (application.getStatus() == AppStatus.STARTING) {
ServiceInstance instance = getInstance(mc);
if (instance != null) {
logger.info("Delay registration until application is ready, service=" + instance.getService());
lifecycle.addReadyHook(() -> {
logger.info("Register when application is ready, service=" + instance.getService());
registry.register(instance);
return mc.invokeOrigin();
}, ctx.getType().getClassLoader());
registry.register(instance, () -> {
mc.invokeOrigin();
return null;
});
mc.setSkip(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex

private final AtomicReference<GovernancePolicy> policy = new AtomicReference<>();

private final Map<String, PolicySubscriber> subscribers = new ConcurrentHashMap<>();
private final Map<String, PolicySubscription> subscriptions = new ConcurrentHashMap<>();

@Getter
@Inject(Publisher.POLICY_SUBSCRIBER)
private Publisher<PolicySubscriber> policyPublisher;
private Publisher<PolicySubscription> policyPublisher;

@Getter
@Inject(Publisher.SYSTEM)
Expand Down Expand Up @@ -252,27 +252,21 @@ public CompletableFuture<Void> subscribe(String namespace, String service) {
return CompletableFuture.completedFuture(null);
}
namespace = namespace == null || namespace.isEmpty() ? application.getService().getNamespace() : namespace;
PolicySubscriber subscriber = new PolicySubscriber(service, namespace, TYPE_SERVICE_SPACE, serviceSyncers);
PolicySubscription subscriber = new PolicySubscription(service, namespace, TYPE_SERVICE_SPACE, serviceSyncers);
subscribe(subscriber);
return subscriber.getFuture();
}

@Override
public boolean isDone(String name) {
PolicySubscriber subscriber = name == null ? null : subscribers.get(name);
return subscriber != null && subscriber.isDone();
}

@Override
public List<PolicySubscriber> getSubscribers() {
return Collections.unmodifiableList(new ArrayList<>(subscribers.values()));
public List<PolicySubscription> getSubscriptions() {
return Collections.unmodifiableList(new ArrayList<>(subscriptions.values()));
}

@Override
public void waitReady() {
if (!subscribers.isEmpty()) {
List<CompletableFuture<Void>> futures = new ArrayList<>(subscribers.size());
subscribers.forEach((k, v) -> futures.add(v.getFuture()));
if (!subscriptions.isEmpty()) {
List<CompletableFuture<Void>> futures = new ArrayList<>(subscriptions.size());
subscriptions.forEach((k, v) -> futures.add(v.getFuture()));
try {
Futures.allOf(futures).get(governanceConfig.getInitializeTimeout(), TimeUnit.MILLISECONDS);
systemPublisher.offer(AgentEvent.onServicePolicyReady("Application service policies are ready."));
Expand Down Expand Up @@ -398,18 +392,18 @@ private void warmup() {
warmups.add(name);
}
if (!warmups.isEmpty()) {
warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, TYPE_SERVICE_SPACE, serviceSyncers)));
warmups.forEach(o -> subscribe(new PolicySubscription(o, namespace, TYPE_SERVICE_SPACE, serviceSyncers)));
}
}
}

/**
* Subscribes a {@link PolicySubscriber} to the policy publisher.
* Subscribes a {@link PolicySubscription} to the policy publisher.
*
* @param subscriber The {@link PolicySubscriber} to be subscribed.
* @param subscriber The {@link PolicySubscription} to be subscribed.
*/
protected void subscribe(PolicySubscriber subscriber) {
PolicySubscriber exist = subscribers.putIfAbsent(subscriber.getName(), subscriber);
protected void subscribe(PolicySubscription subscriber) {
PolicySubscription exist = subscriptions.putIfAbsent(subscriber.getName(), subscriber);
if (exist == null) {
policyPublisher.offer(subscriber);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* of the policy subscription process.
*/
@Getter
public class PolicySubscriber implements ServiceName {
public class PolicySubscription implements ServiceName {

private final String name;

Expand All @@ -54,7 +54,7 @@ public class PolicySubscriber implements ServiceName {
* @param type The type of the subscriber.
* @param owners The owner of the subscriber.
*/
public PolicySubscriber(String name, String namespace, String type, List<String> owners) {
public PolicySubscription(String name, String namespace, String type, List<String> owners) {
this.name = name;
this.namespace = namespace;
this.type = type;
Expand Down Expand Up @@ -103,16 +103,6 @@ public boolean complete() {
return false;
}

/**
* Checks if the associated future has completed successfully.
*
* @return {@code true} if the future is done and has not completed exceptionally,
* otherwise {@code false}
*/
public boolean isDone() {
return future.isDone() && !future.isCompletedExceptionally();
}

/**
* Marks the subscription process as complete with an exception. This method completes the associated future
* exceptionally, indicating that the subscription process has finished due to an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ default boolean update(Function<GovernancePolicy, GovernancePolicy> updater) {
}

/**
* Retrieves a list of policy subscribers.
* Retrieves a list of policy subscription.
*
* @return A list of {@link PolicySubscriber} who are subscribed to policy updates.
* @return A list of {@link PolicySubscription} who are subscribed to policy updates.
*/
List<PolicySubscriber> getSubscribers();
List<PolicySubscription> getSubscriptions();

/**
* Waits until the application is ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ default CompletableFuture<Void> subscribe(String service) {
return subscribe(null, service);
}


/**
* Subscribes a specific service policy based on its name.
*
Expand All @@ -60,14 +59,5 @@ default CompletableFuture<Void> subscribe(String service) {
* @return A {@link CompletableFuture} that completes when the subscription is successful.
*/
CompletableFuture<Void> subscribe(String namespace, String service);

/**
* Checks if the task associated with the given name has completed successfully.
*
* @param name the name of the task to check
* @return {@code true} if the task with the specified name is done and has not completed exceptionally,
* otherwise {@code false}
*/
boolean isDone(String name);
}

Loading

0 comments on commit d3085d2

Please sign in to comment.