diff --git a/src/main/java/org/apposed/appose/Service.java b/src/main/java/org/apposed/appose/Service.java index 1d3ea3d..a015650 100644 --- a/src/main/java/org/apposed/appose/Service.java +++ b/src/main/java/org/apposed/appose/Service.java @@ -35,6 +35,7 @@ import java.io.InputStreamReader; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -58,6 +59,20 @@ public class Service implements AutoCloseable { private final Map tasks = new ConcurrentHashMap<>(); private final int serviceID; + /** + * List of unparseable (non-JSON) lines seen since the service started. + * If the worker process crashes, we use these lines as the content + * of an error message to any still pending tasks when reporting the crash. + */ + private final List invalidLines = new ArrayList<>(); + + /** + * List of lines emitted to the standard error stream seen since the service + * started. If the worker process crashes, we use these lines as the content + * of an error message to any still pending tasks when reporting the crash. + */ + private final List errorLines = new ArrayList<>(); + private Process process; private PrintWriter stdin; private Thread stdoutThread; @@ -131,12 +146,84 @@ public Task task(String script, Map inputs) throws IOException { return new Task(script, inputs); } - /** Closes the worker process's input stream, in order to shut it down. */ + /** + * Closes the worker process's input stream, in order to shut it down. + * Pending tasks will run to completion before the worker process terminates. + *

+ * To shut down the service more forcibly, interrupting any pending tasks, + * use {@link #kill()} instead. + *

+ *

+ * To wait until the service's worker process has completely shut down + * and all output has been reported, call {@link #waitFor()} afterward. + *

+ */ @Override public void close() { stdin.close(); } + /** + * Forces the service's worker process to begin shutting down. Any tasks still + * pending completion will be interrupted, reporting {@link TaskStatus#CRASHED}. + *

+ * To shut down the service more gently, allowing any pending tasks to run to + * completion, use {@link #close()} instead. + *

+ *

+ * To wait until the service's worker process has completely shut down + * and all output has been reported, call {@link #waitFor()} afterward. + *

+ */ + public void kill() { + process.destroyForcibly(); + } + + /** + * Waits for the service's worker process to terminate. + * + * @return Exit value of the worker process. + * @throws InterruptedException If any of the worker process's monitoring + * threads are interrupted before shutting down. + */ + public int waitFor() throws InterruptedException { + process.waitFor(); + + // Wait for worker output processing threads to finish up. + stdoutThread.join(); + stderrThread.join(); + monitorThread.join(); + + return process.exitValue(); + } + + /** + * Returns true if the service's worker process is currently running, + * or false if it has not yet started or has already shut down or crashed. + * + * @return Whether the service's worker process is currently running. + */ + public boolean isAlive() { + return process != null && process.isAlive(); + } + + /** + * Unparseable lines emitted by the worker process on its stdout stream, + * collected over the lifetime of the service. + * Can be useful for analyzing why a worker process has crashed. + */ + public List invalidLines() { + return Collections.unmodifiableList(invalidLines); + } + + /** + * Lines emitted by the worker process on its stderr stream, + * collected over the lifetime of the service. + * Can be useful for analyzing why a worker process has crashed. + */ + public List errorLines() { + return Collections.unmodifiableList(errorLines); + } /** Input loop processing lines from the worker stdout stream. */ private void stdoutLoop() { BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream())); @@ -153,7 +240,7 @@ private void stdoutLoop() { if (line == null) { debugService(""); - return; + break; } try { Map response = Types.decode(line); @@ -174,6 +261,7 @@ private void stdoutLoop() { // Something went wrong decoding the line of JSON. // Skip it and keep going, but log it first. debugService(String.format(" %s", line)); + invalidLines.add(line); } } } @@ -181,40 +269,60 @@ private void stdoutLoop() { /** Input loop processing lines from the worker stderr stream. */ private void stderrLoop() { BufferedReader stderr = new BufferedReader(new InputStreamReader(process.getErrorStream())); - try { - while (true) { - String line = stderr.readLine(); - if (line == null) { - debugService(""); - return; - } - debugWorker(line); + while (true) { + String line; + try { + line = stderr.readLine(); } - } - catch (IOException exc) { - debugWorker(Types.stackTrace(exc)); + catch (IOException exc) { + // Something went wrong reading the line. Panic! + debugService(Types.stackTrace(exc)); + break; + } + if (line == null) { + debugService(""); + break; + } + debugWorker(line); + errorLines.add(line); } } + @SuppressWarnings("BusyWait") private void monitorLoop() { // Wait until the worker process terminates. - while (process.isAlive()) { + while (process.isAlive() || stdoutThread.isAlive() || stderrThread.isAlive()) { try { - Thread.sleep(50); + Thread.sleep(10); } catch (InterruptedException exc) { debugService(Types.stackTrace(exc)); } } + debugService(""); // Do some sanity checks. int exitCode = process.exitValue(); if (exitCode != 0) debugService(""); int taskCount = tasks.size(); - if (taskCount > 0) debugService(""); + if (taskCount > 0) { + debugService(""); + } - // Notify any remaining tasks about the process crash. - tasks.values().forEach(Task::crash); + Collection remainingTasks = tasks.values(); + if (!remainingTasks.isEmpty()) { + // Notify any remaining tasks about the process crash. + StringBuilder sb = new StringBuilder(); + String nl = System.lineSeparator(); + sb.append("Worker crashed with exit code ").append(exitCode).append(".").append(nl); + String stdout = invalidLines.isEmpty() ? "" : String.join(nl, invalidLines); + String stderr = errorLines.isEmpty() ? "" : String.join(nl, errorLines); + sb.append(nl).append("[stdout]").append(nl).append(stdout).append(nl); + sb.append(nl).append("[stderr]").append(nl).append(stderr).append(nl); + String error = sb.toString(); + remainingTasks.forEach(task -> task.crash(error)); + } tasks.clear(); } @@ -325,7 +433,6 @@ private void request(RequestType requestType, Map args) { debugService(encoded); } - @SuppressWarnings("hiding") private void handle(Map response) { String maybeResponseType = (String) response.get("responseType"); if (maybeResponseType == null) { @@ -377,9 +484,10 @@ private void handle(Map response) { } } - private void crash() { + private void crash(String error) { TaskEvent event = new TaskEvent(this, ResponseType.CRASH); status = TaskStatus.CRASHED; + this.error = error; listeners.forEach(l -> l.accept(event)); synchronized (this) { notifyAll(); diff --git a/src/test/java/org/apposed/appose/ApposeTest.java b/src/test/java/org/apposed/appose/ApposeTest.java index e84ec54..14b1d88 100644 --- a/src/test/java/org/apposed/appose/ApposeTest.java +++ b/src/test/java/org/apposed/appose/ApposeTest.java @@ -30,6 +30,8 @@ package org.apposed.appose; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -38,6 +40,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apposed.appose.Service.ResponseType; @@ -152,7 +155,7 @@ public void testServiceStartupFailure() throws IOException, InterruptedException public void testTaskFailurePython() throws InterruptedException, IOException { Environment env = Appose.system(); try (Service service = env.python()) { - service.debug(System.out::println); + maybeDebug(service); String script = "whee\n"; Task task = service.task(script); task.waitFor(); @@ -168,6 +171,96 @@ public void testTaskFailurePython() throws InterruptedException, IOException { } } + @Test + public void testStartupCrash() throws InterruptedException, IOException { + Environment env = Appose.system(); + List pythonExes = Arrays.asList("python", "python3", "python.exe"); + Service service = env.service(pythonExes, "-c", "import nonexistentpackage").start(); + // Wait up to 500ms for the crash. + for (int i = 0; i < 100; i++) { + if (!service.isAlive()) break; + Thread.sleep(5); + } + assertFalse(service.isAlive()); + assertEquals(3, service.errorLines().size()); + // Check that the crash happened and was recorded correctly. + List expectedLines = Arrays.asList( + "Traceback (most recent call last):", + " File \"\", line 1, in ", + "ModuleNotFoundError: No module named 'nonexistentpackage'" + ); + assertEquals(expectedLines, service.errorLines()); + } + + @Test + public void testCrashWithActiveTask() throws InterruptedException, IOException { + Environment env = Appose.system(); + try (Service service = env.python()) { + maybeDebug(service); + // Create a "long-running" task. + String script = + "import sys\n" + + "sys.stderr.write('one\\n')\n" + + "sys.stderr.flush()\n" + + "print('two')\n" + + "sys.stdout.flush()\n" + + "sys.stderr.write('three\\n')\n" + + "sys.stderr.flush()\n" + + "task.update('halfway')\n" + + "print('four')\n" + + "sys.stdout.flush()\n" + + "sys.stderr.write('five\\n')\n" + + "sys.stderr.flush()\n" + + "print('six')\n" + + "sys.stdout.flush()\n" + + "sys.stderr.write('seven\\n')\n" + + "import time; time.sleep(999)\n"; + Task task = service.task(script); + + // Record any crash reported in the task notifications. + String[] reportedError = {null}; + task.listen(event -> { + if (event.responseType == ResponseType.CRASH) { + reportedError[0] = task.error; + } + }); + // Launch the task. + task.start(); + // Simulate a crash after 100ms has gone by. + Thread.sleep(100); + service.kill(); + + // Wait for the service to fully shut down after the crash. + int exitCode = service.waitFor(); + assertTrue(exitCode != 0); + + // Is the tag flagged as crashed? + assertSame(TaskStatus.CRASHED, task.status); + + // Was the crash error successfully and consistently recorded? + assertNotNull(reportedError[0]); + List lines = Arrays.asList(task.error.split("\\n")); + String nl = System.lineSeparator(); + assertEquals(Arrays.asList("two", "four", "six"), service.invalidLines()); + assertEquals(Arrays.asList("one", "three", "five", "seven"), service.errorLines()); + String expected = + "Worker crashed with exit code ###." + nl + + nl + + "[stdout]" + nl + + "two" + nl + + "four" + nl + + "six" + nl + + nl + + "[stderr]" + nl + + "one" + nl + + "three" + nl + + "five" + nl + + "seven" + nl; + String generalizedError = task.error.replaceFirst("exit code [0-9]+", "exit code ###"); + assertEquals(expected, generalizedError); + } + } + public void executeAndAssert(Service service, String script) throws IOException, InterruptedException {