Skip to content

Commit

Permalink
GCToolKit#addDataSourceParser (#288)
Browse files Browse the repository at this point in the history
* refactor: fix up mapping of aggregators to channels

* add method GCToolkit#addDataSourceParser

* Add GCToolKit#LOG_DEBUG_MESSAGE

* improve debug messages

* clean up formatting of debug message

* s/producesEvents/eventsProduced/

---------

Co-authored-by: Kirk Pepperdine <[email protected]>
  • Loading branch information
dsgrieve and kcpeppe committed Apr 11, 2023
1 parent 35cee52 commit 27aad2d
Show file tree
Hide file tree
Showing 23 changed files with 366 additions and 104 deletions.
138 changes: 126 additions & 12 deletions api/src/main/java/com/microsoft/gctoolkit/GCToolKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package com.microsoft.gctoolkit;

import com.microsoft.gctoolkit.aggregator.Aggregation;
import com.microsoft.gctoolkit.aggregator.Aggregator;
import com.microsoft.gctoolkit.aggregator.EventSource;
import com.microsoft.gctoolkit.io.DataSource;
import com.microsoft.gctoolkit.io.GCLogFile;
import com.microsoft.gctoolkit.io.RotatingGCLogFile;
Expand All @@ -14,16 +16,21 @@
import com.microsoft.gctoolkit.message.JVMEventChannel;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Parameter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.lang.Class.forName;
Expand All @@ -35,6 +42,38 @@ public class GCToolKit {

private static final Logger LOGGER = Logger.getLogger(GCToolKit.class.getName());

private static final String GCTOOLKIT_DEBUG = System.getProperty("gctoolkit.debug");
private static final boolean DEBUGGING = GCTOOLKIT_DEBUG != null;

// returns true if gctoolkit.debug is set to "all" or contains "className", but does not contain "-className"
private static boolean isDebugging(String className) {
return DEBUGGING
&& (GCTOOLKIT_DEBUG.isEmpty()
|| ((GCTOOLKIT_DEBUG.contains("all") || GCTOOLKIT_DEBUG.contains(className))
&& !GCTOOLKIT_DEBUG.contains("-" + className)));
}

/**
* Print a debug message to System.out if gctoolkit.debug is empty, is set to "all",
* or contains "className" but does not contain "-className".
* For example, to enable debug logging for all classes except UnifiedG1GCParser:
* <code>-Dgctoolkit.debug=all,-com.microsoft.gctoolkit.parser.UnifiedG1GCParser</code>
*
* @param message Supplies the message to log. If null, nothing will be logged.
*/
public static void LOG_DEBUG_MESSAGE(Supplier<String> message) {
if (DEBUGGING && message != null) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
String methodName = stackTrace[2].getMethodName();
String className = stackTrace[2].getClassName();
String fileName = stackTrace[2].getFileName();
int lineNumber = stackTrace[2].getLineNumber();
if (isDebugging(className)) {
System.out.println(String.format("DEBUG: %s.%s(%s:%d): %s", className, methodName, fileName, lineNumber, message.get()));
}
}
}

private final HashSet<DataSourceParser> registeredDataSourceParsers = new HashSet<>();
private List<Aggregation> registeredAggregations;
private JVMEventChannel jvmEventChannel = null;
Expand Down Expand Up @@ -71,10 +110,10 @@ public void loadAggregationsFromServiceLoader() {
ServiceLoader.load(Aggregation.class)
.stream()
.map(ServiceLoader.Provider::get)
.forEach(registeredAggregations::add);
//Useful for debugging
if ( Level.FINER.equals(LOGGER.getLevel()))
registeredAggregations.forEach(a -> LOGGER.log(Level.FINER, "Registered " + a.toString()));
.forEach(aggregation -> {
registeredAggregations.add(aggregation);
LOG_DEBUG_MESSAGE(() -> "ServiceLoader provided: " + aggregation.getClass().getName());
});
}

/**
Expand Down Expand Up @@ -154,19 +193,41 @@ private void loadJVMEventChannel() {
}
}

/**
* This method allows full control over which DataSourceParsers are used to parse the DataSource.
* This method should be called before the {@link #analyze(DataSource)} method.
* DataSourceParsers loaded by this method are used in place of those that are ordinarily loaded via
* the service provider interface.
* Use the {@link #addDataSourceParser(DataSourceParser)} method to load a DataSourceParser in addition
* to those loaded by the service provider interface.
* @param dataSourceParser An implementation of DataSourceParser that will be used to parse the DataSource.
*/
public void loadDataSourceParser(DataSourceParser dataSourceParser) {
registeredDataSourceParsers.add(dataSourceParser);
}

private void loadDataSourceParsers(Diary diary) {
private List<DataSourceParser> additiveParsers = new ArrayList<>();

/**
* Add a DataSourceParser to be used to parse a DataSource. The DataSourceParser will be used in addition
* to those loaded by the service provider interface. This method should be called before the
* {@link #analyze(DataSource)} method.
* @param dataSourceParser An implementation of DataSourceParser that will be used to parse the DataSource.
*/
public void addDataSourceParser(DataSourceParser dataSourceParser) {
additiveParsers.add(dataSourceParser);
}

private Set<EventSource> loadDataSourceParsers(Diary diary) {

loadDataSourceChannel();
loadJVMEventChannel();
List<DataSourceParser> dataSourceParsers;
if (registeredDataSourceParsers.isEmpty()) {
dataSourceParsers = ServiceLoader.load(DataSourceParser.class)
.stream()
.map(ServiceLoader.Provider::get)
.filter(consumer -> consumer.accepts(diary))
.filter(dataSourceParser -> dataSourceParser.accepts(diary))
.collect(Collectors.toList());
} else{
dataSourceParsers = new ArrayList<>();
Expand Down Expand Up @@ -202,18 +263,28 @@ private void loadDataSourceParsers(Diary diary) {
})
.filter(Optional::isPresent)
.map(optional -> (DataSourceParser) optional.get())
.filter(consumer -> consumer.accepts(diary))
.filter(dataSourceParser -> dataSourceParser.accepts(diary))
.collect(Collectors.toList());
if (dataSourceParsers.isEmpty()) {
throw new ServiceConfigurationError("Unable to find a suitable provider to create a DataSourceParser");
}

}

// add in any additional parsers not provided by the module SPI.
dataSourceParsers.addAll(additiveParsers);

if (dataSourceParsers.isEmpty()) {
throw new ServiceConfigurationError("Unable to find a suitable provider to create a DataSourceParser");
}

for (DataSourceParser dataSourceParser : dataSourceParsers) {
LOG_DEBUG_MESSAGE(() -> "Registering " + dataSourceParser.getClass().getName() + " with " + dataSourceChannel.getClass().getName());
dataSourceParser.diary(diary);
dataSourceChannel.registerListener(dataSourceParser);
dataSourceParser.publishTo(jvmEventChannel);
}

return dataSourceParsers.stream()
.map(DataSourceParser::eventsProduced)
.collect(HashSet::new, Set::addAll, Set::addAll);
}

/**
Expand All @@ -230,14 +301,57 @@ private void loadDataSourceParsers(Diary diary) {
*/
public JavaVirtualMachine analyze(DataSource<?> dataSource) throws IOException {
GCLogFile logFile = (GCLogFile)dataSource;
loadDataSourceParsers(logFile.diary());
Set<EventSource> events = loadDataSourceParsers(logFile.diary());
JavaVirtualMachine javaVirtualMachine = loadJavaVirtualMachine(logFile);
try {
javaVirtualMachine.analyze(registeredAggregations, jvmEventChannel, dataSourceChannel);
List<Aggregator<? extends Aggregation>> filteredAggregators = filterAggregations(events);
javaVirtualMachine.analyze(filteredAggregators, jvmEventChannel, dataSourceChannel);
} catch(Throwable t) {
LOGGER.log(Level.SEVERE, "Internal Error: Cannot invoke analyze method", t);
}
return javaVirtualMachine;
}

private List<Aggregator<? extends Aggregation>> filterAggregations(Set<EventSource> events) {
List<Aggregator<? extends Aggregation>> aggregators = new ArrayList<>();
for (Aggregation aggregation : registeredAggregations) {
LOG_DEBUG_MESSAGE(() -> "Evaluating: " + aggregation.getClass().getName());
Constructor<? extends Aggregator<?>> constructor = constructor(aggregation);
if (constructor == null) {
LOGGER.log(Level.WARNING, "Cannot find one of: default constructor or @Collates annotation for " + aggregation.getClass().getName());
continue;
}

Aggregator<? extends Aggregation> aggregator = null;
try {
aggregator = constructor.newInstance(aggregation);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
continue;
}
if (events.stream().anyMatch(aggregator::aggregates)) {
LOG_DEBUG_MESSAGE(() -> "Including : " + aggregation.getClass().getName());
aggregators.add(aggregator);
} else {
LOG_DEBUG_MESSAGE(() -> "Excluding : " + aggregation.getClass().getName());
}
}
return aggregators;

}

@SuppressWarnings("unchecked")
private Constructor<? extends Aggregator<?>> constructor(Aggregation aggregation) {
Class<? extends Aggregator<?>> targetClazz = aggregation.collates();
if ( targetClazz != null) {
Constructor<?>[] constructors = targetClazz.getConstructors();
for (Constructor<?> constructor : constructors) {
Parameter[] parameters = constructor.getParameters();
if (parameters.length == 1 && Aggregation.class.isAssignableFrom(parameters[0].getType()))
return (Constructor<? extends Aggregator<?>>) constructor;
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ public void onCompletion(Runnable task) {
* Call a callback when aggregation is completed.
*/
private void complete() {
Runnable t = completionTask;
this.completionTask = null;
if (t != null)
Executors.newSingleThreadExecutor().execute(t);
if (completionTask != null)
Executors.newSingleThreadExecutor().execute(completionTask);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package com.microsoft.gctoolkit.jvm;

import com.microsoft.gctoolkit.GCToolKit;
import com.microsoft.gctoolkit.aggregator.Aggregation;
import com.microsoft.gctoolkit.aggregator.Aggregator;
import com.microsoft.gctoolkit.aggregator.EventSource;
Expand All @@ -14,9 +15,6 @@
import com.microsoft.gctoolkit.time.DateTimeStamp;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Parameter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -32,8 +30,6 @@
*/
public abstract class AbstractJavaVirtualMachine implements JavaVirtualMachine {

private boolean debugging = Boolean.getBoolean("microsoft.debug.aggregation");

private static final Logger LOGGER = Logger.getLogger(AbstractJavaVirtualMachine.class.getName());
private static final double LOG_FRAGMENT_THRESHOLD_SECONDS = 60.0d; //todo: replace magic threshold with a heuristic

Expand Down Expand Up @@ -152,20 +148,6 @@ public <T extends Aggregation> Optional<T> getAggregation(Class<T> aggregationCl
return Optional.ofNullable((T) aggregatedData.get(aggregationClass));
}

@SuppressWarnings("unchecked")
private Constructor<? extends Aggregator<?>> constructor(Aggregation aggregation) {
Class<? extends Aggregator<?>> targetClazz = aggregation.collates();
if ( targetClazz != null) {
Constructor<?>[] constructors = targetClazz.getConstructors();
for (Constructor<?> constructor : constructors) {
Parameter[] parameters = constructor.getParameters();
if (parameters.length == 1 && Aggregation.class.isAssignableFrom(parameters[0].getType()))
return (Constructor<? extends Aggregator<?>>) constructor;
}
}
return null;
}

/**
* Orchestrate the analysis of a GC log. Step wise
* 1. find the aggregators that aggregate events generated by the gc log
Expand All @@ -174,40 +156,24 @@ private Constructor<? extends Aggregator<?>> constructor(Aggregation aggregation
* 4. Wait until all the aggregators have completed
* 5. Set the start and end times
* 6. Return to the caller
* @param registeredAggregations all of the aggregations loaded by the module SPI
* @param registeredAggregators all of the aggregations loaded by the module SPI
* @param eventBus the bus to publish events on
* @param dataSourceBus the bus that raw log lines are published on
*/
@Override
public void analyze(List<Aggregation> registeredAggregations, JVMEventChannel eventBus, DataSourceChannel dataSourceBus) {
public void analyze(List<Aggregator<? extends Aggregation>> registeredAggregators, JVMEventChannel eventBus, DataSourceChannel dataSourceBus) {
Phaser finishLine = new Phaser();
Set<EventSource> generatedEvents = diary.generatesEvents();
for (Aggregation aggregation : registeredAggregations) {
if (debugging)
LOGGER.log(Level.INFO, "Evaluating: " + aggregation.toString());
Constructor<? extends Aggregator<?>> constructor = constructor(aggregation);
if (constructor == null) {
LOGGER.log(Level.WARNING, "Cannot find one of: default constructor or @Collates annotation for " + aggregation.getClass().getName());
continue;
}
if (debugging)
LOGGER.log(Level.INFO, "Loading : " + aggregation.toString());
Aggregator<? extends Aggregation> aggregator = null;
try {
aggregator = constructor.newInstance(aggregation);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
continue;
}
for (Aggregator aggregator : registeredAggregators) {
Aggregation aggregation = aggregator.aggregation();
aggregatedData.put(aggregation.getClass(), aggregation);
Optional<EventSource> source = generatedEvents.stream().filter(aggregator::aggregates).findFirst();
if (source.isPresent()) {
LOGGER.log(Level.FINE, "Registering: " + aggregation.getClass().getName());
generatedEvents.stream().filter(aggregator::aggregates).forEach(eventSource -> {
GCToolKit.LOG_DEBUG_MESSAGE(() -> "Registering " + aggregator.getClass().getName() + " with " + eventSource.toChannel());
finishLine.register();
aggregator.onCompletion(finishLine::arriveAndDeregister);
JVMEventChannelAggregator eventChannelAggregator = new JVMEventChannelAggregator(source.get().toChannel(), aggregator);
JVMEventChannelAggregator eventChannelAggregator = new JVMEventChannelAggregator(eventSource.toChannel(), aggregator);
eventBus.registerListener(eventChannelAggregator);
}
});
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.microsoft.gctoolkit.GCToolKit;
import com.microsoft.gctoolkit.aggregator.Aggregation;
import com.microsoft.gctoolkit.aggregator.Aggregator;
import com.microsoft.gctoolkit.io.DataSource;
import com.microsoft.gctoolkit.message.DataSourceChannel;
import com.microsoft.gctoolkit.message.JVMEventChannel;
Expand Down Expand Up @@ -122,5 +123,5 @@ public interface JavaVirtualMachine {
* @param eventChannel JVMEvent message channel
* @param dataSourceChannel GC logging data channel
*/
void analyze(List<Aggregation> registeredAggregations, JVMEventChannel eventChannel, DataSourceChannel dataSourceChannel);
void analyze(List<Aggregator<? extends Aggregation>> registeredAggregations, JVMEventChannel eventChannel, DataSourceChannel dataSourceChannel);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.microsoft.gctoolkit.message;

import com.microsoft.gctoolkit.aggregator.EventSource;
import com.microsoft.gctoolkit.jvm.Diary;

import java.util.Set;

public interface DataSourceParser extends DataSourceChannelListener {
void publishTo(JVMEventChannel channel);
void diary(Diary diary);
boolean accepts(Diary diary);
Set<EventSource> eventsProduced();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.microsoft.gctoolkit.io;

import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class RotatingGCLogTest {

@Test
void orderRotatingLogsTest() {
Path path = new TestLogFile("G1-80-16gbps2.log").getFile().toPath();
try {
RotatingGCLogFile file = new RotatingGCLogFile(path);
assertEquals(2, file.getMetaData().getNumberOfFiles());
assertEquals(2, file.getMetaData().logFiles().map(LogFileSegment::getPath).map(Path::toFile).map(File::getName).filter(s -> s.startsWith("G1-80-16gbps2")).count());
file.getMetaData().logFiles().map(LogFileSegment::getEndTime).forEach(System.out::println);
} catch (IOException ioe) {
fail(ioe);
}
}
}
Loading

0 comments on commit 27aad2d

Please sign in to comment.