diff --git a/celos-server/src/main/java/com/collective/celos/RerunState.java b/celos-server/src/main/java/com/collective/celos/RerunState.java index 82725c187..1ff1b1560 100644 --- a/celos-server/src/main/java/com/collective/celos/RerunState.java +++ b/celos-server/src/main/java/com/collective/celos/RerunState.java @@ -23,7 +23,7 @@ public class RerunState extends ValueObject { public static final int EXPIRATION_DAYS = 14; private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String RERUN_TIME_PROP = "rerunTime"; + private static final String RERUN_TIME_PROP = "scheduledTime"; // The wallclock time at which the slot was marked for rerun private final ScheduledTime rerunTime; diff --git a/celos-server/src/main/java/com/collective/celos/Scheduler.java b/celos-server/src/main/java/com/collective/celos/Scheduler.java index be5905e37..212757125 100644 --- a/celos-server/src/main/java/com/collective/celos/Scheduler.java +++ b/celos-server/src/main/java/com/collective/celos/Scheduler.java @@ -149,14 +149,14 @@ void updateSlotState(Workflow wf, SlotState slotState, ScheduledTime current) th if (!xStatus.isRunning()) { if (xStatus.isSuccess()) { LOGGER.info("Slot successful: " + slotID + " external ID: " + externalID); - database.putSlotState(slotState.transitionToSuccess()); + database.updateSlotToSuccess(slotState); } else { if (slotState.getRetryCount() < wf.getMaxRetryCount()) { LOGGER.info("Slot failed, preparing for retry: " + slotID + " external ID: " + externalID); database.putSlotState(slotState.transitionToRetry()); } else { LOGGER.info("Slot failed permanently: " + slotID + " external ID: " + externalID); - database.putSlotState(slotState.transitionToFailure()); + database.updateSlotToFailure(slotState); } } } else { diff --git a/celos-server/src/main/java/com/collective/celos/servlet/RerunServlet.java b/celos-server/src/main/java/com/collective/celos/servlet/RerunServlet.java index 59fd1ff61..3e76a847a 100644 --- a/celos-server/src/main/java/com/collective/celos/servlet/RerunServlet.java +++ b/celos-server/src/main/java/com/collective/celos/servlet/RerunServlet.java @@ -66,7 +66,7 @@ protected void doPost(HttpServletRequest req, HttpServletResponse res) throws Se StateDatabase db = scheduler.getStateDatabase(); LOGGER.info("Scheduling Slot for rerun: " + slot); - db.updateSlotForRerun(slot, ScheduledTime.now()); + db.updateSlotForRerun(slot); } catch(Exception e) { throw new ServletException(e); diff --git a/celos-server/src/main/java/com/collective/celos/state/FileSystemStateDatabase.java b/celos-server/src/main/java/com/collective/celos/state/FileSystemStateDatabase.java index 76cba7dd0..df770c2f9 100644 --- a/celos-server/src/main/java/com/collective/celos/state/FileSystemStateDatabase.java +++ b/celos-server/src/main/java/com/collective/celos/state/FileSystemStateDatabase.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.SortedSet; import java.util.TreeSet; @@ -135,12 +136,18 @@ private String getFileName(SlotID slotID) { } @Override - public void markSlotForRerun(SlotID slotID, ScheduledTime now) throws Exception { - RerunState st = new RerunState(now); + protected void markSlotForRerun(SlotID slotID) throws Exception { + RerunState st = new RerunState(slotID.getScheduledTime()); File file = getSlotRerunFile(slotID); writeJson(st.toJSONNode(), file); } + @Override + protected void unMarkSlotForRerun(SlotID slot) throws Exception { + File file = getSlotRerunFile(slot); + Files.deleteIfExists(file.toPath()); + } + @Override public SortedSet getTimesMarkedForRerun(WorkflowID workflowID) throws Exception { SortedSet res = new TreeSet<>(); diff --git a/celos-server/src/main/java/com/collective/celos/state/StateDatabase.java b/celos-server/src/main/java/com/collective/celos/state/StateDatabase.java index 037fed160..d5f16d668 100644 --- a/celos-server/src/main/java/com/collective/celos/state/StateDatabase.java +++ b/celos-server/src/main/java/com/collective/celos/state/StateDatabase.java @@ -41,21 +41,37 @@ public abstract class StateDatabase { /** * Marks the slot for rerun at the current wallclock time. */ - protected abstract void markSlotForRerun(SlotID slot, ScheduledTime now) throws Exception; - + protected abstract void markSlotForRerun(SlotID slot) throws Exception; + + /** + * Unmarks the slot for rerun at the current wallclock time. + */ + protected abstract void unMarkSlotForRerun(SlotID slot) throws Exception; + /** * Returns the list of scheduled times of the given workflow that have been marked for rerun. */ protected abstract SortedSet getTimesMarkedForRerun(WorkflowID workflowID) throws Exception; + public void updateSlotToSuccess(SlotState slot) throws Exception { + SlotState newState = slot.transitionToSuccess(); + putSlotState(newState); + unMarkSlotForRerun(newState.getSlotID()); + } + + public void updateSlotToFailure(SlotState slot) throws Exception { + SlotState newState = slot.transitionToFailure(); + putSlotState(newState); + unMarkSlotForRerun(newState.getSlotID()); + } - public void updateSlotForRerun(SlotID slotID, ScheduledTime now) throws Exception { + public void updateSlotForRerun(SlotID slotID) throws Exception { SlotState state = getSlotState(slotID); if (state != null) { SlotState newState = state.transitionToRerun(); putSlotState(newState); } - markSlotForRerun(slotID, now); + markSlotForRerun(slotID); } public static StateDatabase makeFSDatabase(File dir) throws IOException { diff --git a/celos-server/src/test/java/com/collective/celos/RerunStateTest.java b/celos-server/src/test/java/com/collective/celos/RerunStateTest.java index d452772a4..7faa3e45f 100644 --- a/celos-server/src/test/java/com/collective/celos/RerunStateTest.java +++ b/celos-server/src/test/java/com/collective/celos/RerunStateTest.java @@ -15,30 +15,21 @@ */ package com.collective.celos; -import java.io.IOException; - +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.Assert; import org.junit.Test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; public class RerunStateTest { - @Test - public void testExpiration() { - RerunState st = new RerunState(new ScheduledTime("2015-09-07T00:00Z")); - Assert.assertTrue(st.isExpired(new ScheduledTime("2015-10-07T00:00Z"))); - Assert.assertFalse(st.isExpired(new ScheduledTime("2015-09-14T00:00Z"))); - Assert.assertFalse(st.isExpired(new ScheduledTime("2014-09-14T00:00Z"))); - } - @Test public void testJSON() throws IOException { ObjectMapper mapper = new ObjectMapper(); RerunState st = new RerunState(new ScheduledTime("2015-09-07T00:00Z")); String json = mapper.writeValueAsString(st.toJSONNode()); - Assert.assertEquals("{\"rerunTime\":\"2015-09-07T00:00:00.000Z\"}", json); + Assert.assertEquals("{\"scheduledTime\":\"2015-09-07T00:00:00.000Z\"}", json); Assert.assertEquals(st, RerunState.fromJSONNode((ObjectNode) mapper.readTree(json))); } diff --git a/celos-server/src/test/java/com/collective/celos/SchedulerTest.java b/celos-server/src/test/java/com/collective/celos/SchedulerTest.java index 8c661e093..127752df2 100644 --- a/celos-server/src/test/java/com/collective/celos/SchedulerTest.java +++ b/celos-server/src/test/java/com/collective/celos/SchedulerTest.java @@ -254,7 +254,6 @@ public void updateSlotStateRunningExternalIsRunning() throws Exception { public void updateSlotStateRunningExternalIsSuccess() throws Exception { SlotState slotState = new SlotState(slotId, SlotState.Status.RUNNING); - SlotState nextSlotState = new SlotState(slotId, SlotState.Status.SUCCESS); // The external service should report the status as success ExternalStatus success = new MockExternalService.MockExternalStatusSuccess(); @@ -262,7 +261,7 @@ public void updateSlotStateRunningExternalIsSuccess() throws Exception { scheduler.updateSlotState(wf, slotState, ScheduledTime.now()); - verify(stateDatabase).putSlotState(nextSlotState); + verify(stateDatabase).updateSlotToSuccess(slotState); verifyNoMoreInteractions(stateDatabase); } @@ -270,7 +269,6 @@ public void updateSlotStateRunningExternalIsSuccess() throws Exception { public void updateSlotStateRunningExternalIsFailure() throws Exception { SlotState slotState = new SlotState(slotId, SlotState.Status.RUNNING); - SlotState nextSlotState = new SlotState(slotId, SlotState.Status.FAILURE); // The external service should report the status as failure ExternalStatus failure = new MockExternalService.MockExternalStatusFailure(); @@ -278,7 +276,7 @@ public void updateSlotStateRunningExternalIsFailure() throws Exception { scheduler.updateSlotState(wf, slotState, ScheduledTime.now()); - verify(stateDatabase).putSlotState(nextSlotState); + verify(stateDatabase).updateSlotToFailure(slotState); verifyNoMoreInteractions(stateDatabase); } @@ -399,7 +397,7 @@ public void rerunTest() throws Exception { SlotID id = new SlotID(wfID1, new ScheduledTime("2000-11-27T15:01Z")); Assert.assertEquals(null, db.getSlotState(id)); // mark the slot for rerun and verify scheduler cares about it - db.markSlotForRerun(id, now); + db.updateSlotForRerun(id); sched.step(now); Assert.assertEquals(SlotState.Status.READY, db.getSlotState(id).getStatus()); sched.step(now); diff --git a/celos-server/src/test/java/com/collective/celos/state/FileSystemStateDatabaseTest.java b/celos-server/src/test/java/com/collective/celos/state/FileSystemStateDatabaseTest.java index bda434af0..fa288c4d3 100644 --- a/celos-server/src/test/java/com/collective/celos/state/FileSystemStateDatabaseTest.java +++ b/celos-server/src/test/java/com/collective/celos/state/FileSystemStateDatabaseTest.java @@ -180,7 +180,7 @@ private void failsOnWrongStatusTest(SlotState.Status status) throws Exception { SlotID id = new SlotID(new WorkflowID("foo"), new ScheduledTime("2014-02-08T20:00Z")); SlotState state = new SlotState(id, status); db.putSlotState(state); - db.updateSlotForRerun(state.getSlotID(), ScheduledTime.now()); + db.updateSlotForRerun(state.getSlotID()); } @Test @@ -197,7 +197,7 @@ private void succeedsOnRightStatusTest(SlotState.Status status) throws Exception SlotID id = new SlotID(new WorkflowID("foo"), new ScheduledTime("2014-02-08T20:00Z")); SlotState state = new SlotState(id, status); db.putSlotState(state); - db.updateSlotForRerun(state.getSlotID(), ScheduledTime.now()); + db.updateSlotForRerun(state.getSlotID()); SlotState dbState = db.getSlotState(id); org.junit.Assert.assertEquals(state.transitionToRerun(), dbState); } diff --git a/celos-server/src/test/java/com/collective/celos/state/MemoryStateDatabase.java b/celos-server/src/test/java/com/collective/celos/state/MemoryStateDatabase.java index a57497a75..6378046a6 100644 --- a/celos-server/src/test/java/com/collective/celos/state/MemoryStateDatabase.java +++ b/celos-server/src/test/java/com/collective/celos/state/MemoryStateDatabase.java @@ -39,17 +39,21 @@ public SlotState getSlotState(SlotID id) throws Exception { public void putSlotState(SlotState state) throws Exception { map.put(state.getSlotID(), state); } - - public int size() { - return map.size(); - } @Override - public void markSlotForRerun(SlotID slot, ScheduledTime now) throws Exception { - // Doesn't implement GC for rerun + protected void markSlotForRerun(SlotID slot) throws Exception { rerun.add(slot); } + @Override + protected void unMarkSlotForRerun(SlotID slot) throws Exception { + rerun.remove(slot); + } + + public int size() { + return map.size(); + } + @Override protected SortedSet getTimesMarkedForRerun(WorkflowID workflowID) throws Exception { SortedSet res = new TreeSet<>();