Skip to content

Commit

Permalink
Unmark rerun slots after they have succeeded or failed #590
Browse files Browse the repository at this point in the history
  • Loading branch information
ollie64 committed Sep 10, 2015
1 parent c8e218a commit 595651a
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class RerunState extends ValueObject {
public static final int EXPIRATION_DAYS = 14;

private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String RERUN_TIME_PROP = "rerunTime";
private static final String RERUN_TIME_PROP = "scheduledTime";

// The wallclock time at which the slot was marked for rerun
private final ScheduledTime rerunTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ void updateSlotState(Workflow wf, SlotState slotState, ScheduledTime current) th
if (!xStatus.isRunning()) {
if (xStatus.isSuccess()) {
LOGGER.info("Slot successful: " + slotID + " external ID: " + externalID);
database.putSlotState(slotState.transitionToSuccess());
database.updateSlotToSuccess(slotState);
} else {
if (slotState.getRetryCount() < wf.getMaxRetryCount()) {
LOGGER.info("Slot failed, preparing for retry: " + slotID + " external ID: " + externalID);
database.putSlotState(slotState.transitionToRetry());
} else {
LOGGER.info("Slot failed permanently: " + slotID + " external ID: " + externalID);
database.putSlotState(slotState.transitionToFailure());
database.updateSlotToFailure(slotState);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void doPost(HttpServletRequest req, HttpServletResponse res) throws Se

StateDatabase db = scheduler.getStateDatabase();
LOGGER.info("Scheduling Slot for rerun: " + slot);
db.updateSlotForRerun(slot, ScheduledTime.now());
db.updateSlotForRerun(slot);

} catch(Exception e) {
throw new ServletException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.SortedSet;
import java.util.TreeSet;

Expand Down Expand Up @@ -135,12 +136,18 @@ private String getFileName(SlotID slotID) {
}

@Override
public void markSlotForRerun(SlotID slotID, ScheduledTime now) throws Exception {
RerunState st = new RerunState(now);
protected void markSlotForRerun(SlotID slotID) throws Exception {
RerunState st = new RerunState(slotID.getScheduledTime());
File file = getSlotRerunFile(slotID);
writeJson(st.toJSONNode(), file);
}

@Override
protected void unMarkSlotForRerun(SlotID slot) throws Exception {
File file = getSlotRerunFile(slot);
Files.deleteIfExists(file.toPath());
}

@Override
public SortedSet<ScheduledTime> getTimesMarkedForRerun(WorkflowID workflowID) throws Exception {
SortedSet<ScheduledTime> res = new TreeSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,37 @@ public abstract class StateDatabase {
/**
* Marks the slot for rerun at the current wallclock time.
*/
protected abstract void markSlotForRerun(SlotID slot, ScheduledTime now) throws Exception;

protected abstract void markSlotForRerun(SlotID slot) throws Exception;

/**
* Unmarks the slot for rerun at the current wallclock time.
*/
protected abstract void unMarkSlotForRerun(SlotID slot) throws Exception;

/**
* Returns the list of scheduled times of the given workflow that have been marked for rerun.
*/
protected abstract SortedSet<ScheduledTime> getTimesMarkedForRerun(WorkflowID workflowID) throws Exception;

public void updateSlotToSuccess(SlotState slot) throws Exception {
SlotState newState = slot.transitionToSuccess();
putSlotState(newState);
unMarkSlotForRerun(newState.getSlotID());
}

public void updateSlotToFailure(SlotState slot) throws Exception {
SlotState newState = slot.transitionToFailure();
putSlotState(newState);
unMarkSlotForRerun(newState.getSlotID());
}

public void updateSlotForRerun(SlotID slotID, ScheduledTime now) throws Exception {
public void updateSlotForRerun(SlotID slotID) throws Exception {
SlotState state = getSlotState(slotID);
if (state != null) {
SlotState newState = state.transitionToRerun();
putSlotState(newState);
}
markSlotForRerun(slotID, now);
markSlotForRerun(slotID);
}

public static StateDatabase makeFSDatabase(File dir) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,21 @@
*/
package com.collective.celos;

import java.io.IOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;

public class RerunStateTest {

@Test
public void testExpiration() {
RerunState st = new RerunState(new ScheduledTime("2015-09-07T00:00Z"));
Assert.assertTrue(st.isExpired(new ScheduledTime("2015-10-07T00:00Z")));
Assert.assertFalse(st.isExpired(new ScheduledTime("2015-09-14T00:00Z")));
Assert.assertFalse(st.isExpired(new ScheduledTime("2014-09-14T00:00Z")));
}

@Test
public void testJSON() throws IOException {
ObjectMapper mapper = new ObjectMapper();
RerunState st = new RerunState(new ScheduledTime("2015-09-07T00:00Z"));
String json = mapper.writeValueAsString(st.toJSONNode());
Assert.assertEquals("{\"rerunTime\":\"2015-09-07T00:00:00.000Z\"}", json);
Assert.assertEquals("{\"scheduledTime\":\"2015-09-07T00:00:00.000Z\"}", json);
Assert.assertEquals(st, RerunState.fromJSONNode((ObjectNode) mapper.readTree(json)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,31 +254,29 @@ public void updateSlotStateRunningExternalIsRunning() throws Exception {
public void updateSlotStateRunningExternalIsSuccess() throws Exception {

SlotState slotState = new SlotState(slotId, SlotState.Status.RUNNING);
SlotState nextSlotState = new SlotState(slotId, SlotState.Status.SUCCESS);

// The external service should report the status as success
ExternalStatus success = new MockExternalService.MockExternalStatusSuccess();
when(externalService.getStatus(slotId, slotState.getExternalID())).thenReturn(success);

scheduler.updateSlotState(wf, slotState, ScheduledTime.now());

verify(stateDatabase).putSlotState(nextSlotState);
verify(stateDatabase).updateSlotToSuccess(slotState);
verifyNoMoreInteractions(stateDatabase);
}

@Test
public void updateSlotStateRunningExternalIsFailure() throws Exception {

SlotState slotState = new SlotState(slotId, SlotState.Status.RUNNING);
SlotState nextSlotState = new SlotState(slotId, SlotState.Status.FAILURE);

// The external service should report the status as failure
ExternalStatus failure = new MockExternalService.MockExternalStatusFailure();
when(externalService.getStatus(slotId, slotState.getExternalID())).thenReturn(failure);

scheduler.updateSlotState(wf, slotState, ScheduledTime.now());

verify(stateDatabase).putSlotState(nextSlotState);
verify(stateDatabase).updateSlotToFailure(slotState);
verifyNoMoreInteractions(stateDatabase);
}

Expand Down Expand Up @@ -399,7 +397,7 @@ public void rerunTest() throws Exception {
SlotID id = new SlotID(wfID1, new ScheduledTime("2000-11-27T15:01Z"));
Assert.assertEquals(null, db.getSlotState(id));
// mark the slot for rerun and verify scheduler cares about it
db.markSlotForRerun(id, now);
db.updateSlotForRerun(id);
sched.step(now);
Assert.assertEquals(SlotState.Status.READY, db.getSlotState(id).getStatus());
sched.step(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void failsOnWrongStatusTest(SlotState.Status status) throws Exception {
SlotID id = new SlotID(new WorkflowID("foo"), new ScheduledTime("2014-02-08T20:00Z"));
SlotState state = new SlotState(id, status);
db.putSlotState(state);
db.updateSlotForRerun(state.getSlotID(), ScheduledTime.now());
db.updateSlotForRerun(state.getSlotID());
}

@Test
Expand All @@ -197,7 +197,7 @@ private void succeedsOnRightStatusTest(SlotState.Status status) throws Exception
SlotID id = new SlotID(new WorkflowID("foo"), new ScheduledTime("2014-02-08T20:00Z"));
SlotState state = new SlotState(id, status);
db.putSlotState(state);
db.updateSlotForRerun(state.getSlotID(), ScheduledTime.now());
db.updateSlotForRerun(state.getSlotID());
SlotState dbState = db.getSlotState(id);
org.junit.Assert.assertEquals(state.transitionToRerun(), dbState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ public SlotState getSlotState(SlotID id) throws Exception {
public void putSlotState(SlotState state) throws Exception {
map.put(state.getSlotID(), state);
}

public int size() {
return map.size();
}

@Override
public void markSlotForRerun(SlotID slot, ScheduledTime now) throws Exception {
// Doesn't implement GC for rerun
protected void markSlotForRerun(SlotID slot) throws Exception {
rerun.add(slot);
}

@Override
protected void unMarkSlotForRerun(SlotID slot) throws Exception {
rerun.remove(slot);
}

public int size() {
return map.size();
}

@Override
protected SortedSet<ScheduledTime> getTimesMarkedForRerun(WorkflowID workflowID) throws Exception {
SortedSet<ScheduledTime> res = new TreeSet<>();
Expand Down

0 comments on commit 595651a

Please sign in to comment.