Skip to content

Commit

Permalink
feat: Support for batch scheduling (#596)
Browse files Browse the repository at this point in the history
## Brief, plain english overview of your changes here

This PR adds support for scheduling new tasks using batch inserts.

Known issues:
- Updates are not supported; if any of the tasks in a batch exists, the
batch will fail
- Hard-coded batch size of 100 when using `Stream` as parameter. Inserts
are committed after each batch.

## Fixes
#595 


## Reminders
- [x] Added/ran automated tests
- [x] Update README and/or examples
- [x] Ran `mvn spotless:apply`

---------

Co-authored-by: Gustav Karlsson <[email protected]>
  • Loading branch information
geirsagberg and kagkarlsson authored Jan 29, 2025
1 parent 88ed8c3 commit b041658
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
cache: 'maven'

- name: Run all tests
run: mvn -B -Pall-tests,publication clean package --file pom.xml
run: mvn -B -Ptests-for-ci,publication clean package --file pom.xml
env:
TZ: UTC
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ scheduler.schedule(
.scheduledTo(Instant.now().plusSeconds(5)));
```

... or schedule in batches using:

```java
Stream<TaskInstance<?>> taskInstances = Stream.of(
MY_TASK.instance("my-task-1", 1),
MY_TASK.instance("my-task-2", 2),
MY_TASK.instance("my-task-3", 3));

scheduler.scheduleBatch(taskInstances, Instant.now());
```

### More examples

#### Plain Java
Expand Down
8 changes: 1 addition & 7 deletions db-scheduler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -369,16 +369,10 @@
</properties>
</profile>
<profile>
<id>compatibility</id>
<id>tests-for-ci</id>
<properties>
<test.excludedTags>compatibility-cluster</test.excludedTags>
</properties>
</profile>
<profile>
<id>compatibility-cluster</id>
<properties>
<test.excludedTags>compatibility</test.excludedTags>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ public <T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstanc
return this.delegate.scheduleIfNotExists(schedulableInstance);
}

@Override
public void scheduleBatch(List<TaskInstance<?>> taskInstances, Instant executionTime) {
this.delegate.scheduleBatch(taskInstances, executionTime);
}

@Override
public void scheduleBatch(List<SchedulableInstance<?>> schedulableInstances) {
this.delegate.scheduleBatch(schedulableInstances);
}

@Override
public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
this.delegate.schedule(taskInstance, executionTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.kagkarlsson.scheduler;

import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;

import com.github.kagkarlsson.scheduler.SchedulerClient.ScheduleOptions.WhenExists;
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
Expand All @@ -26,6 +27,7 @@
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.ScheduledTaskInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
Expand All @@ -35,12 +37,15 @@
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface SchedulerClient {

int DEFAULT_BATCH_SIZE = 100;

/**
* Schedule a new execution for the given task instance.
*
Expand Down Expand Up @@ -116,6 +121,51 @@ <T> boolean schedule(
*/
<T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstance);

/**
* Schedule a batch of executions. If any of the executions already exists, the scheduling will
* fail and an exception will be thrown.
*
* @param taskInstances Task-instance to schedule, optionally with data
* @param executionTime Instant it should run
* @see java.time.Instant
* @see com.github.kagkarlsson.scheduler.task.TaskInstance
*/
void scheduleBatch(List<TaskInstance<?>> taskInstances, Instant executionTime);

/**
* Schedule a batch of executions. If any of the executions already exists, the scheduling will
* fail and an exception will be thrown.
*
* @param schedulableInstances Task-instances with individual execution-times
* @see com.github.kagkarlsson.scheduler.task.SchedulableInstance
*/
void scheduleBatch(List<SchedulableInstance<?>> schedulableInstances);

/**
* Schedule a batch of executions. If any of the executions already exists, the scheduling will
* fail and an exception will be thrown.
*
* @param taskInstances Task-instances to schedule, optionally with data
* @param executionTime Instant it should run
* @see java.time.Instant
* @see com.github.kagkarlsson.scheduler.task.TaskInstance
*/
default void scheduleBatch(Stream<TaskInstance<?>> taskInstances, Instant executionTime) {
StreamUtils.chunkStream(taskInstances, DEFAULT_BATCH_SIZE)
.forEach(chunk -> scheduleBatch(chunk, executionTime));
}

/**
* Schedule a batch of executions. If any of the executions already exists, the scheduling will
* fail and an exception will be thrown.
*
* @param schedulableInstances Task-instances with individual execution-times
* @see com.github.kagkarlsson.scheduler.task.SchedulableInstance
*/
default void scheduleBatch(Stream<SchedulableInstance<?>> schedulableInstances) {
StreamUtils.chunkStream(schedulableInstances, DEFAULT_BATCH_SIZE).forEach(this::scheduleBatch);
}

/**
* Update an existing execution to a new execution-time. If the execution does not exist or if it
* is currently running, an exception is thrown.
Expand Down Expand Up @@ -389,6 +439,34 @@ public <T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstanc
schedulableInstance.getNextExecutionTime(clock.now()));
}

@Override
public void scheduleBatch(List<TaskInstance<?>> taskInstances, Instant executionTime) {
List<ScheduledTaskInstance> batchToSchedule =
taskInstances.stream()
.map(taskInstance -> new ScheduledTaskInstance(taskInstance, executionTime))
.collect(toList());

taskRepository.createBatch(batchToSchedule);
notifyListenersOfScheduledBatch(batchToSchedule);
}

@Override
public void scheduleBatch(List<SchedulableInstance<?>> schedulableInstances) {
List<ScheduledTaskInstance> batchToSchedule =
schedulableInstances.stream()
.map(schedulable -> ScheduledTaskInstance.fixExecutionTime(schedulable, clock))
.collect(toList());

taskRepository.createBatch(batchToSchedule);
notifyListenersOfScheduledBatch(batchToSchedule);
}

private void notifyListenersOfScheduledBatch(List<ScheduledTaskInstance> batchToSchedule) {
batchToSchedule.forEach(
instance ->
schedulerListeners.onExecutionScheduled(instance, instance.getExecutionTime()));
}

@Override
public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
schedule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

public class StreamUtils {

public static <T> Stream<List<T>> chunkStream(Stream<T> stream, int chunkSize) {
if (chunkSize <= 0) {
throw new IllegalArgumentException("Chunk size must be greater than 0");
}

Iterator<T> iterator = stream.iterator();
return Stream.generate(
() -> {
List<T> chunk = new ArrayList<>();
for (int i = 0; i < chunkSize && iterator.hasNext(); i++) {
chunk.add(iterator.next());
}
return chunk;
})
.takeWhile(chunk -> !chunk.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.ScheduledTaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -24,11 +25,21 @@

public interface TaskRepository {

boolean createIfNotExists(SchedulableInstance execution);
/** Prefer ScheduledTaskInstance which has a fixed execution-time */
@Deprecated
boolean createIfNotExists(SchedulableInstance<?> execution);

boolean createIfNotExists(ScheduledTaskInstance execution);

List<Execution> getDue(Instant now, int limit);

Instant replace(Execution toBeReplaced, SchedulableInstance newInstance);
void createBatch(List<ScheduledTaskInstance> executions);

Instant replace(Execution toBeReplaced, ScheduledTaskInstance newInstance);

/** Prefer ScheduledTaskInstance which has a fixed execution-time */
@Deprecated
Instant replace(Execution toBeReplaced, SchedulableInstance<?> newInstance);

void getScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<Execution> consumer);

Expand Down Expand Up @@ -71,7 +82,6 @@ boolean reschedule(
default Optional<Execution> getExecution(TaskInstanceId taskInstance) {
return getExecution(taskInstance.getTaskName(), taskInstance.getId());
}
;

int removeExecutions(String taskName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.kagkarlsson.scheduler.exceptions;

public abstract class DbSchedulerException extends RuntimeException {
static final long serialVersionUID = -2132850112553296790L;
private static final long serialVersionUID = -2132850112553296790L;

public DbSchedulerException(String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.exceptions;

public class FailedToScheduleBatchException extends DbSchedulerException {
private static final long serialVersionUID = -2132850112553296792L;

public FailedToScheduleBatchException(String message) {
super(message);
}

public FailedToScheduleBatchException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.kagkarlsson.scheduler.exceptions;

public class TaskInstanceException extends DbSchedulerException {
static final long serialVersionUID = -2132850112553296790L;
private static final long serialVersionUID = -2132850112553296791L;
private static final String TASK_NAME_INSTANCE_MESSAGE_PART = " (task name: %s, instance id: %s)";

private final String taskName;
Expand Down
Loading

0 comments on commit b041658

Please sign in to comment.