Skip to content
This repository has been archived by the owner on Jun 28, 2024. It is now read-only.

Fixes threading issue #202

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions fork-runner/src/main/java/com/shazam/fork/ForkRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
import static com.shazam.fork.Utils.namedExecutor;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
Expand Down Expand Up @@ -66,15 +68,14 @@ public ForkRunner(PoolLoader poolLoader,
}

public boolean run() {
ExecutorService poolExecutor = null;
try {
Collection<Pool> pools = poolLoader.loadPools();
poolExecutor = namedExecutor(pools.size(), "PoolExecutor-%d");

Collection<TestCaseEvent> testCases = testClassLoader.loadTestSuite();
summaryGeneratorHook.registerHook(pools, testCases);

executeTests(poolExecutor, pools, testCases);
executeTests(pools, testCases);


AggregatedTestResult aggregatedTestResult = aggregator.aggregateTestResults(pools, testCases);
if (!aggregatedTestResult.getFatalCrashedTests().isEmpty()) {
Expand All @@ -83,7 +84,7 @@ public boolean run() {

Collection<TestCaseEvent> fatalCrashedTestCases =
findFatalCrashedTestCases(testCases, aggregatedTestResult.getFatalCrashedTests());
executeTests(poolExecutor, pools, fatalCrashedTestCases);
executeTests(pools, fatalCrashedTestCases);

aggregatedTestResult = aggregator.aggregateTestResults(pools, testCases);

Expand All @@ -107,29 +108,40 @@ public boolean run() {
} catch (Exception e) {
logger.error("Error while Fork was executing", e);
return false;
} finally {
if (poolExecutor != null) {
poolExecutor.shutdown();
}
}
}

private void executeTests(ExecutorService poolExecutor,
Collection<Pool> pools,
Collection<TestCaseEvent> testCases) throws InterruptedException {
private void executeTests(
Collection<Pool> pools,
Collection<TestCaseEvent> testCases
) {
ProgressReporter progressReporter = progressReporterFactory.createProgressReporter();
progressReporter.start();

CountDownLatch poolCountDownLatch = new CountDownLatch(pools.size());
ExecutorService poolExecutor = null;
try {
poolExecutor = namedExecutor(pools.size(), "PoolExecutor-%d");

for (Pool pool : pools) {
Runnable poolTestRunner = poolTestRunnerFactory.createPoolTestRunner(
pool,
testCases,
progressReporter
);
poolExecutor.submit(poolTestRunner);
}

for (Pool pool : pools) {
Runnable poolTestRunner =
poolTestRunnerFactory.createPoolTestRunner(pool, testCases, poolCountDownLatch, progressReporter);
poolExecutor.execute(poolTestRunner);
poolExecutor.shutdown();
awaitTerminationUninterruptibly(poolExecutor);
} finally {
progressReporter.stop();

if (poolExecutor != null && !poolExecutor.isTerminated()) {
poolExecutor.shutdownNow();
awaitTerminationUninterruptibly(poolExecutor);
}
}
poolCountDownLatch.await();

progressReporter.stop();
}

private static void reportMissingTests(AggregatedTestResult aggregatedTestResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.Deque;

import static com.shazam.fork.system.io.RemoteFileManager.*;

Expand All @@ -33,25 +32,22 @@ public class DeviceTestRunner implements Runnable {
private final Installer installer;
private final Pool pool;
private final Device device;
private final Queue<TestCaseEvent> queueOfTestsInPool;
private final CountDownLatch deviceCountDownLatch;
private final Deque<TestCaseEvent> queueOfTestsInPool;
private final ProgressReporter progressReporter;
private final ScreenRecorder screenRecorder;
private final TestRunFactory testRunFactory;

public DeviceTestRunner(Installer installer,
Pool pool,
Device device,
Queue<TestCaseEvent> queueOfTestsInPool,
CountDownLatch deviceCountDownLatch,
Deque<TestCaseEvent> queueOfTestsInPool,
ProgressReporter progressReporter,
ScreenRecorder screenRecorder,
TestRunFactory testRunFactory) {
this.installer = installer;
this.pool = pool;
this.device = device;
this.queueOfTestsInPool = queueOfTestsInPool;
this.deviceCountDownLatch = deviceCountDownLatch;
this.progressReporter = progressReporter;
this.screenRecorder = screenRecorder;
this.testRunFactory = testRunFactory;
Expand Down Expand Up @@ -81,7 +77,6 @@ public void run() {
}
} finally {
logger.info("Device {} from pool {} finished", device.getSerial(), pool.getName());
deviceCountDownLatch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import com.shazam.fork.model.TestCaseEvent;
import com.shazam.fork.system.adb.Installer;

import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.Deque;

public class DeviceTestRunnerFactory {

Expand All @@ -30,8 +29,7 @@ public DeviceTestRunnerFactory(Installer installer, TestRunFactory testRunFactor
}

public Runnable createDeviceTestRunner(Pool pool,
Queue<TestCaseEvent> testClassQueue,
CountDownLatch deviceInPoolCountDownLatch,
Deque<TestCaseEvent> testClassQueue,
Device device,
ProgressReporter progressReporter
) {
Expand All @@ -40,7 +38,6 @@ public Runnable createDeviceTestRunner(Pool pool,
pool,
device,
testClassQueue,
deviceInPoolCountDownLatch,
progressReporter,
new ScreenRecorderImpl(device),
testRunFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.Deque;
import java.util.concurrent.ExecutorService;

import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
import static com.shazam.fork.Utils.namedExecutor;

public class PoolTestRunner implements Runnable {
private final Logger logger = LoggerFactory.getLogger(PoolTestRunner.class);
public static final String DROPPED_BY = "DroppedBy-";

private final Pool pool;
private final Queue<TestCaseEvent> testCases;
private final CountDownLatch poolCountDownLatch;
private final Deque<TestCaseEvent> testCases;
private final DeviceTestRunnerFactory deviceTestRunnerFactory;
private final ProgressReporter progressReporter;

public PoolTestRunner(DeviceTestRunnerFactory deviceTestRunnerFactory, Pool pool,
Queue<TestCaseEvent> testCases,
CountDownLatch poolCountDownLatch,
Deque<TestCaseEvent> testCases,
ProgressReporter progressReporter) {
this.pool = pool;
this.testCases = testCases;
this.poolCountDownLatch = poolCountDownLatch;
this.deviceTestRunnerFactory = deviceTestRunnerFactory;
this.progressReporter = progressReporter;
}
Expand All @@ -52,23 +49,25 @@ public void run() {
try {
int devicesInPool = pool.size();
concurrentDeviceExecutor = namedExecutor(devicesInPool, "DeviceExecutor-%d");
CountDownLatch deviceCountDownLatch = new CountDownLatch(devicesInPool);
logger.info("Pool {} started", poolName);
for (Device device : pool.getDevices()) {
Runnable deviceTestRunner = deviceTestRunnerFactory.createDeviceTestRunner(pool, testCases,
deviceCountDownLatch, device, progressReporter);
concurrentDeviceExecutor.execute(deviceTestRunner);
Runnable deviceTestRunner = deviceTestRunnerFactory.createDeviceTestRunner(
pool,
testCases,
device,
progressReporter
);
concurrentDeviceExecutor.submit(deviceTestRunner);
}
deviceCountDownLatch.await();
} catch (InterruptedException e) {
logger.warn("Pool {} was interrupted while running", poolName);

concurrentDeviceExecutor.shutdown();
awaitTerminationUninterruptibly(concurrentDeviceExecutor);
} finally {
if (concurrentDeviceExecutor != null) {
concurrentDeviceExecutor.shutdown();
if (concurrentDeviceExecutor != null && !concurrentDeviceExecutor.isTerminated()) {
concurrentDeviceExecutor.shutdownNow();
awaitTerminationUninterruptibly(concurrentDeviceExecutor);
}
logger.info("Pool {} finished", poolName);
poolCountDownLatch.countDown();
logger.info("Pools remaining: {}", poolCountDownLatch.getCount());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import com.shazam.fork.model.TestCaseEvent;

import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentLinkedDeque;

public class PoolTestRunnerFactory {
private final DeviceTestRunnerFactory deviceTestRunnerFactory;
Expand All @@ -26,7 +25,6 @@ public PoolTestRunnerFactory(DeviceTestRunnerFactory deviceTestRunnerFactory) {

public Runnable createPoolTestRunner(Pool pool,
Collection<TestCaseEvent> testCases,
CountDownLatch poolCountDownLatch,
ProgressReporter progressReporter) {

int totalTests = testCases.size();
Expand All @@ -35,8 +33,7 @@ public Runnable createPoolTestRunner(Pool pool,
return new PoolTestRunner(
deviceTestRunnerFactory,
pool,
new LinkedList<>(testCases),
poolCountDownLatch,
new ConcurrentLinkedDeque<>(testCases),
progressReporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.lang.String.format;

Expand Down Expand Up @@ -64,7 +65,7 @@ public void execute() {
}
runner.setRunName(poolName);
runner.setMethodName(testClassName, testMethodName);
runner.setMaxtimeToOutputResponse(testRunParameters.getTestOutputTimeout());
runner.setMaxTimeToOutputResponse(testRunParameters.getTestOutputTimeout(), TimeUnit.MILLISECONDS);

if (testRunParameters.isCoverageEnabled()) {
runner.setCoverage(true);
Expand Down
Loading