diff --git a/celos-common/src/main/java/com/collective/celos/CelosClient.java b/celos-common/src/main/java/com/collective/celos/CelosClient.java index b20b1b7c2..7fcf5fe25 100644 --- a/celos-common/src/main/java/com/collective/celos/CelosClient.java +++ b/celos-common/src/main/java/com/collective/celos/CelosClient.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Iterator; @@ -69,6 +68,10 @@ public class CelosClient { public static final String KEY_PARAM = "key"; public static final String BUCKET_PARAM = "bucket"; public static final String PREFIX_PARAM = "prefix"; + public static final String CELOSES_NUMBER_PARAM = "celosesNumber"; + public static final String CELOS_INDEX_PARAM = "index"; + public static final Integer CELOSES_NUMBER_PARAM_DEFAULT = 1; + public static final Integer CELOS_INDEX_PARAM_DEFAULT = 0; public static final String KEYS_NODE = "keys"; public static final String PAUSE_NODE = "paused"; @@ -127,19 +130,31 @@ public WorkflowStatus getWorkflowStatus(WorkflowID workflowID) throws Exception } public void iterateScheduler() throws Exception { - iterateScheduler(ScheduledTime.now()); + iterateScheduler(ScheduledTime.now(), Collections.emptySet(), CELOS_INDEX_PARAM_DEFAULT, CELOSES_NUMBER_PARAM_DEFAULT); } public void iterateScheduler(ScheduledTime scheduledTime) throws Exception { - iterateScheduler(scheduledTime, Collections.emptySet()); + iterateScheduler(scheduledTime, Collections.emptySet(), CELOS_INDEX_PARAM_DEFAULT, CELOSES_NUMBER_PARAM_DEFAULT); + } + + public void iterateScheduler(Integer celosIndex, Integer celosesNumber) throws Exception { + iterateScheduler(ScheduledTime.now(), Collections.emptySet(), celosIndex, celosesNumber); } public void iterateScheduler(ScheduledTime scheduledTime, Set workflowIDs) throws Exception { + iterateScheduler(scheduledTime, workflowIDs, 0, 1); + } + + public void iterateScheduler(ScheduledTime scheduledTime, Set workflowIDs, Integer celosIndex, Integer celosesNumber) throws Exception { URIBuilder uriBuilder = new URIBuilder(address); uriBuilder.setPath(uriBuilder.getPath() + SCHEDULER_PATH); if (!workflowIDs.isEmpty()) { uriBuilder.addParameter(IDS_PARAM, StringUtils.join(workflowIDs, ",")); } + if (celosIndex != null && celosesNumber != null) { + uriBuilder.addParameter(CELOS_INDEX_PARAM, celosIndex.toString()); + uriBuilder.addParameter(CELOSES_NUMBER_PARAM, celosesNumber.toString()); + } uriBuilder.addParameter(TIME_PARAM, timeFormatter.formatPretty(scheduledTime)); executePost(uriBuilder.build()); } diff --git a/celos-server/src/main/java/com/collective/celos/Scheduler.java b/celos-server/src/main/java/com/collective/celos/Scheduler.java index 0bed58986..60f0dae8b 100644 --- a/celos-server/src/main/java/com/collective/celos/Scheduler.java +++ b/celos-server/src/main/java/com/collective/celos/Scheduler.java @@ -55,7 +55,7 @@ ScheduledTime getSlidingWindowStartTime(ScheduledTime current) { */ public void step(ScheduledTime current, StateDatabaseConnection connection) throws Exception { // by default, schedule all workflows - step(current, Collections.emptySet(), connection); + step(current, Collections.emptySet(), connection, 0, 1); } /** @@ -63,11 +63,12 @@ public void step(ScheduledTime current, StateDatabaseConnection connection) thro *

* Otherwise, schedule only workflows in the set. */ - public void step(ScheduledTime current, Set workflowIDs, StateDatabaseConnection connection) throws Exception { + public void step(ScheduledTime current, Set workflowIDs, StateDatabaseConnection connection, int celosIndex, int numberOfCeloses) throws Exception { LOGGER.info("Starting scheduler step: " + current + " -- " + getSlidingWindowStartTime(current)); for (Workflow wf : configuration.getWorkflows()) { WorkflowID id = wf.getID(); boolean shouldProcess = workflowIDs.isEmpty() || workflowIDs.contains(id); + shouldProcess &= Math.abs(id.toString().hashCode()) % numberOfCeloses == celosIndex; if (!shouldProcess) { LOGGER.info("Ignoring workflow: " + id); } else if (connection.isPaused(id)) { diff --git a/celos-server/src/main/java/com/collective/celos/servlet/SchedulerServlet.java b/celos-server/src/main/java/com/collective/celos/servlet/SchedulerServlet.java index 5af260a72..883157215 100644 --- a/celos-server/src/main/java/com/collective/celos/servlet/SchedulerServlet.java +++ b/celos-server/src/main/java/com/collective/celos/servlet/SchedulerServlet.java @@ -39,14 +39,20 @@ protected void doPost(HttpServletRequest req, HttpServletResponse res) throws Se Scheduler scheduler = createAndCacheScheduler(); ScheduledTime current = getRequestTime(req); Set workflowIDs = getWorkflowIDs(req); + Integer celosIndex = getInteger(req, CelosClient.CELOS_INDEX_PARAM, CelosClient.CELOS_INDEX_PARAM_DEFAULT); + Integer celosesNumber = getInteger(req, CelosClient.CELOSES_NUMBER_PARAM, CelosClient.CELOSES_NUMBER_PARAM_DEFAULT); try(StateDatabaseConnection connection = getStateDatabase().openConnection()) { - scheduler.step(current, workflowIDs, connection); + scheduler.step(current, workflowIDs, connection, celosIndex, celosesNumber); } } catch(Exception e) { throw new RuntimeException(e); } } + private Integer getInteger(HttpServletRequest req, String str, Integer defaultVal) { + return req.getParameter(str) == null ? defaultVal : Integer.valueOf(req.getParameter(str)); + } + Set getWorkflowIDs(HttpServletRequest req) { String idString = req.getParameter(CelosClient.IDS_PARAM); if (idString == null) { @@ -61,4 +67,5 @@ Set getWorkflowIDs(HttpServletRequest req) { } } + } diff --git a/celos-server/src/test/java/com/collective/celos/SchedulerTest.java b/celos-server/src/test/java/com/collective/celos/SchedulerTest.java index af3096eb4..1b44be9ee 100644 --- a/celos-server/src/test/java/com/collective/celos/SchedulerTest.java +++ b/celos-server/src/test/java/com/collective/celos/SchedulerTest.java @@ -902,8 +902,6 @@ public void schedulingOnlySubsetOfWorkflowsWorks() throws Exception { cfg.addWorkflow(wf1); cfg.addWorkflow(wf2); - MemoryStateDatabase db = new MemoryStateDatabase(); - SlotID id1 = new SlotID(wfID1, new ScheduledTime("2013-11-27T20:00Z")); SlotID id2 = new SlotID(wfID2, new ScheduledTime("2013-11-27T20:00Z")); @@ -920,7 +918,7 @@ public void schedulingOnlySubsetOfWorkflowsWorks() throws Exception { DateTime current = DateTime.parse("2013-11-27T22:01Z"); Scheduler sched = new Scheduler(cfg, slidingWindowHours); - sched.step(new ScheduledTime(current), subset, connection); + sched.step(new ScheduledTime(current), subset, connection, 0, 1); SlotState slot1After = connection.getSlotState(id1); Assert.assertEquals(SlotState.Status.READY, slot1After.getStatus()); @@ -929,6 +927,76 @@ public void schedulingOnlySubsetOfWorkflowsWorks() throws Exception { Assert.assertEquals(SlotState.Status.WAITING, slot2After.getStatus()); } + @Test + public void schedulingOnlySubsetOfWorkflowsByHash() throws Exception { + WorkflowID wfID1 = new WorkflowID("some-wf1"); + WorkflowID wfID2 = new WorkflowID("some-wf2"); + WorkflowID wfID3 = new WorkflowID("some-wf3"); + WorkflowID wfID4 = new WorkflowID("some-wf4"); + WorkflowID wfID5 = new WorkflowID("some-wf5"); + + Assert.assertEquals(Math.abs(wfID1.toString().hashCode()) % 3, 0); + Assert.assertEquals(Math.abs(wfID2.toString().hashCode()) % 3, 1); + Assert.assertEquals(Math.abs(wfID3.toString().hashCode()) % 3, 2); + Assert.assertEquals(Math.abs(wfID4.toString().hashCode()) % 3, 0); + Assert.assertEquals(Math.abs(wfID5.toString().hashCode()) % 3, 1); + + Schedule sch1 = makeHourlySchedule(); + SchedulingStrategy str1 = makeSerialSchedulingStrategy(); + Trigger tr1 = makeAlwaysTrigger(); + MockExternalService srv1 = new MockExternalService(new MockExternalService.MockExternalStatusRunning()); + int maxRetryCount = 0; + Workflow wf1 = new Workflow(wfID1, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo); + Workflow wf2 = new Workflow(wfID2, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo); + Workflow wf3 = new Workflow(wfID3, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo); + Workflow wf4 = new Workflow(wfID4, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo); + Workflow wf5 = new Workflow(wfID5, sch1, str1, tr1, srv1, maxRetryCount, Workflow.DEFAULT_START_TIME, Workflow.DEFAULT_WAIT_TIMEOUT_SECONDS, emptyWorkflowInfo); + + WorkflowConfiguration cfg = new WorkflowConfiguration(); + cfg.addWorkflow(wf1); + cfg.addWorkflow(wf2); + cfg.addWorkflow(wf3); + cfg.addWorkflow(wf4); + cfg.addWorkflow(wf5); + + SlotID id1 = new SlotID(wfID1, new ScheduledTime("2013-11-27T20:00Z")); + SlotID id2 = new SlotID(wfID2, new ScheduledTime("2013-11-27T20:00Z")); + SlotID id3 = new SlotID(wfID3, new ScheduledTime("2013-11-27T20:00Z")); + SlotID id4 = new SlotID(wfID4, new ScheduledTime("2013-11-27T20:00Z")); + SlotID id5 = new SlotID(wfID5, new ScheduledTime("2013-11-27T20:00Z")); + + SlotState slot1 = new SlotState(id1, SlotState.Status.WAITING); + SlotState slot2 = new SlotState(id2, SlotState.Status.WAITING); + SlotState slot3 = new SlotState(id3, SlotState.Status.WAITING); + SlotState slot4 = new SlotState(id4, SlotState.Status.WAITING); + SlotState slot5 = new SlotState(id5, SlotState.Status.WAITING); + + connection.putSlotState(slot1); + connection.putSlotState(slot2); + connection.putSlotState(slot3); + connection.putSlotState(slot4); + connection.putSlotState(slot5); + + int slidingWindowHours = 3; + DateTime current = DateTime.parse("2013-11-27T22:01Z"); + + Scheduler sched = new Scheduler(cfg, slidingWindowHours); + sched.step(new ScheduledTime(current), Collections.emptySet(), connection, 1, 3); + + SlotState slot1After = connection.getSlotState(id1); + SlotState slot2After = connection.getSlotState(id2); + SlotState slot3After = connection.getSlotState(id3); + SlotState slot4After = connection.getSlotState(id4); + SlotState slot5After = connection.getSlotState(id5); + + Assert.assertEquals(SlotState.Status.WAITING, slot1After.getStatus()); + Assert.assertEquals(SlotState.Status.READY, slot2After.getStatus()); + Assert.assertEquals(SlotState.Status.WAITING, slot3After.getStatus()); + Assert.assertEquals(SlotState.Status.WAITING, slot4After.getStatus()); + Assert.assertEquals(SlotState.Status.READY, slot5After.getStatus()); + } + + @Test public void testTimeoutCalculation() { Assert.assertTrue(Scheduler.isSlotTimedOut(new ScheduledTime("2015-03-05T00:00:00Z"), new ScheduledTime("2015-03-05T00:00:21Z"), 20));