Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iterate on workflows subset #719

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions celos-common/src/main/java/com/collective/celos/CelosClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -69,6 +68,10 @@ public class CelosClient {
public static final String KEY_PARAM = "key";
public static final String BUCKET_PARAM = "bucket";
public static final String PREFIX_PARAM = "prefix";
public static final String CELOSES_NUMBER_PARAM = "celosesNumber";
public static final String CELOS_INDEX_PARAM = "index";
public static final Integer CELOSES_NUMBER_PARAM_DEFAULT = 1;
public static final Integer CELOS_INDEX_PARAM_DEFAULT = 0;

public static final String KEYS_NODE = "keys";
public static final String PAUSE_NODE = "paused";
Expand Down Expand Up @@ -127,19 +130,31 @@ public WorkflowStatus getWorkflowStatus(WorkflowID workflowID) throws Exception
}

public void iterateScheduler() throws Exception {
iterateScheduler(ScheduledTime.now());
iterateScheduler(ScheduledTime.now(), Collections.<WorkflowID>emptySet(), CELOS_INDEX_PARAM_DEFAULT, CELOSES_NUMBER_PARAM_DEFAULT);
}

public void iterateScheduler(ScheduledTime scheduledTime) throws Exception {
iterateScheduler(scheduledTime, Collections.<WorkflowID>emptySet());
iterateScheduler(scheduledTime, Collections.<WorkflowID>emptySet(), CELOS_INDEX_PARAM_DEFAULT, CELOSES_NUMBER_PARAM_DEFAULT);
}

public void iterateScheduler(Integer celosIndex, Integer celosesNumber) throws Exception {
iterateScheduler(ScheduledTime.now(), Collections.<WorkflowID>emptySet(), celosIndex, celosesNumber);
}

public void iterateScheduler(ScheduledTime scheduledTime, Set<WorkflowID> workflowIDs) throws Exception {
iterateScheduler(scheduledTime, workflowIDs, 0, 1);
}

public void iterateScheduler(ScheduledTime scheduledTime, Set<WorkflowID> workflowIDs, Integer celosIndex, Integer celosesNumber) throws Exception {
URIBuilder uriBuilder = new URIBuilder(address);
uriBuilder.setPath(uriBuilder.getPath() + SCHEDULER_PATH);
if (!workflowIDs.isEmpty()) {
uriBuilder.addParameter(IDS_PARAM, StringUtils.join(workflowIDs, ","));
}
if (celosIndex != null && celosesNumber != null) {
uriBuilder.addParameter(CELOS_INDEX_PARAM, celosIndex.toString());
uriBuilder.addParameter(CELOSES_NUMBER_PARAM, celosesNumber.toString());
}
uriBuilder.addParameter(TIME_PARAM, timeFormatter.formatPretty(scheduledTime));
executePost(uriBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ ScheduledTime getSlidingWindowStartTime(ScheduledTime current) {
*/
public void step(ScheduledTime current, StateDatabaseConnection connection) throws Exception {
// by default, schedule all workflows
step(current, Collections.<WorkflowID>emptySet(), connection);
step(current, Collections.<WorkflowID>emptySet(), connection, 0, 1);
}

/**
* If workflowIDs is empty, schedule all workflows.
* <p>
* Otherwise, schedule only workflows in the set.
*/
public void step(ScheduledTime current, Set<WorkflowID> workflowIDs, StateDatabaseConnection connection) throws Exception {
public void step(ScheduledTime current, Set<WorkflowID> workflowIDs, StateDatabaseConnection connection, int celosIndex, int numberOfCeloses) throws Exception {
LOGGER.info("Starting scheduler step: " + current + " -- " + getSlidingWindowStartTime(current));
for (Workflow wf : configuration.getWorkflows()) {
WorkflowID id = wf.getID();
boolean shouldProcess = workflowIDs.isEmpty() || workflowIDs.contains(id);
shouldProcess &= Math.abs(id.toString().hashCode()) % numberOfCeloses == celosIndex;
if (!shouldProcess) {
LOGGER.info("Ignoring workflow: " + id);
} else if (connection.isPaused(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@ protected void doPost(HttpServletRequest req, HttpServletResponse res) throws Se
Scheduler scheduler = createAndCacheScheduler();
ScheduledTime current = getRequestTime(req);
Set<WorkflowID> workflowIDs = getWorkflowIDs(req);
Integer celosIndex = getInteger(req, CelosClient.CELOS_INDEX_PARAM, CelosClient.CELOS_INDEX_PARAM_DEFAULT);
Integer celosesNumber = getInteger(req, CelosClient.CELOSES_NUMBER_PARAM, CelosClient.CELOSES_NUMBER_PARAM_DEFAULT);
try(StateDatabaseConnection connection = getStateDatabase().openConnection()) {
scheduler.step(current, workflowIDs, connection);
scheduler.step(current, workflowIDs, connection, celosIndex, celosesNumber);
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}

private Integer getInteger(HttpServletRequest req, String str, Integer defaultVal) {
return req.getParameter(str) == null ? defaultVal : Integer.valueOf(req.getParameter(str));
}

Set<WorkflowID> getWorkflowIDs(HttpServletRequest req) {
String idString = req.getParameter(CelosClient.IDS_PARAM);
if (idString == null) {
Expand All @@ -61,4 +67,5 @@ Set<WorkflowID> getWorkflowIDs(HttpServletRequest req) {
}
}


}
74 changes: 71 additions & 3 deletions celos-server/src/test/java/com/collective/celos/SchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,6 @@ public void schedulingOnlySubsetOfWorkflowsWorks() throws Exception {
cfg.addWorkflow(wf1);
cfg.addWorkflow(wf2);

MemoryStateDatabase db = new MemoryStateDatabase();

SlotID id1 = new SlotID(wfID1, new ScheduledTime("2013-11-27T20:00Z"));
SlotID id2 = new SlotID(wfID2, new ScheduledTime("2013-11-27T20:00Z"));

Expand All @@ -920,7 +918,7 @@ public void schedulingOnlySubsetOfWorkflowsWorks() throws Exception {
DateTime current = DateTime.parse("2013-11-27T22:01Z");

Scheduler sched = new Scheduler(cfg, slidingWindowHours);
sched.step(new ScheduledTime(current), subset, connection);
sched.step(new ScheduledTime(current), subset, connection, 0, 1);

SlotState slot1After = connection.getSlotState(id1);
Assert.assertEquals(SlotState.Status.READY, slot1After.getStatus());
Expand All @@ -929,6 +927,76 @@ public void schedulingOnlySubsetOfWorkflowsWorks() throws Exception {
Assert.assertEquals(SlotState.Status.WAITING, slot2After.getStatus());
}

@Test
public void schedulingOnlySubsetOfWorkflowsByHash() throws Exception {
WorkflowID wfID1 = new WorkflowID("some-wf1");
WorkflowID wfID2 = new WorkflowID("some-wf2");
WorkflowID wfID3 = new WorkflowID("some-wf3");
WorkflowID wfID4 = new WorkflowID("some-wf4");
WorkflowID wfID5 = new WorkflowID("some-wf5");

Assert.assertEquals(Math.abs(wfID1.toString().hashCode()) % 3, 0);
Assert.assertEquals(Math.abs(wfID2.toString().hashCode()) % 3, 1);
Assert.assertEquals(Math.abs(wfID3.toString().hashCode()) % 3, 2);
Assert.assertEquals(Math.abs(wfID4.toString().hashCode()) % 3, 0);
Assert.assertEquals(Math.abs(wfID5.toString().hashCode()) % 3, 1);

Schedule sch1 = makeHourlySchedule();
SchedulingStrategy str1 = makeSerialSchedulingStrategy();
Trigger tr1 = makeAlwaysTrigger();
MockExternalService srv1 = new MockExternalService(new MockExternalService.MockExternalStatusRunning());
int maxRetryCount = 0;
Workflow wf1 = new Workflow(wfID1, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo);
Workflow wf2 = new Workflow(wfID2, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo);
Workflow wf3 = new Workflow(wfID3, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo);
Workflow wf4 = new Workflow(wfID4, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo);
Workflow wf5 = new Workflow(wfID5, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo);

WorkflowConfiguration cfg = new WorkflowConfiguration();
cfg.addWorkflow(wf1);
cfg.addWorkflow(wf2);
cfg.addWorkflow(wf3);
cfg.addWorkflow(wf4);
cfg.addWorkflow(wf5);

SlotID id1 = new SlotID(wfID1, new ScheduledTime("2013-11-27T20:00Z"));
SlotID id2 = new SlotID(wfID2, new ScheduledTime("2013-11-27T20:00Z"));
SlotID id3 = new SlotID(wfID3, new ScheduledTime("2013-11-27T20:00Z"));
SlotID id4 = new SlotID(wfID4, new ScheduledTime("2013-11-27T20:00Z"));
SlotID id5 = new SlotID(wfID5, new ScheduledTime("2013-11-27T20:00Z"));

SlotState slot1 = new SlotState(id1, SlotState.Status.WAITING);
SlotState slot2 = new SlotState(id2, SlotState.Status.WAITING);
SlotState slot3 = new SlotState(id3, SlotState.Status.WAITING);
SlotState slot4 = new SlotState(id4, SlotState.Status.WAITING);
SlotState slot5 = new SlotState(id5, SlotState.Status.WAITING);

connection.putSlotState(slot1);
connection.putSlotState(slot2);
connection.putSlotState(slot3);
connection.putSlotState(slot4);
connection.putSlotState(slot5);

int slidingWindowHours = 3;
DateTime current = DateTime.parse("2013-11-27T22:01Z");

Scheduler sched = new Scheduler(cfg, slidingWindowHours);
sched.step(new ScheduledTime(current), Collections.emptySet(), connection, 1, 3);

SlotState slot1After = connection.getSlotState(id1);
SlotState slot2After = connection.getSlotState(id2);
SlotState slot3After = connection.getSlotState(id3);
SlotState slot4After = connection.getSlotState(id4);
SlotState slot5After = connection.getSlotState(id5);

Assert.assertEquals(SlotState.Status.WAITING, slot1After.getStatus());
Assert.assertEquals(SlotState.Status.READY, slot2After.getStatus());
Assert.assertEquals(SlotState.Status.WAITING, slot3After.getStatus());
Assert.assertEquals(SlotState.Status.WAITING, slot4After.getStatus());
Assert.assertEquals(SlotState.Status.READY, slot5After.getStatus());
}


@Test
public void testTimeoutCalculation() {
Assert.assertTrue(Scheduler.isSlotTimedOut(new ScheduledTime("2015-03-05T00:00:00Z"), new ScheduledTime("2015-03-05T00:00:21Z"), 20));
Expand Down