diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b3858955..7fd1d0d1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,6 @@ jobs: cache: 'maven' - name: Run all tests - run: mvn -B -Pall-tests,publication clean package --file pom.xml + run: mvn -B -Ptests-for-ci,publication clean package --file pom.xml env: TZ: UTC diff --git a/README.md b/README.md index 1f33bacc..3e73eab7 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,17 @@ scheduler.schedule( .scheduledTo(Instant.now().plusSeconds(5))); ``` +... or schedule in batches using: + +```java +Stream> taskInstances = Stream.of( + MY_TASK.instance("my-task-1", 1), + MY_TASK.instance("my-task-2", 2), + MY_TASK.instance("my-task-3", 3)); + +scheduler.scheduleBatch(taskInstances, Instant.now()); +``` + ### More examples #### Plain Java diff --git a/db-scheduler/pom.xml b/db-scheduler/pom.xml index e91f33be..69ee1816 100644 --- a/db-scheduler/pom.xml +++ b/db-scheduler/pom.xml @@ -369,16 +369,10 @@ - compatibility + tests-for-ci compatibility-cluster - - compatibility-cluster - - compatibility - - diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index 678ee2f7..44b86b78 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -264,6 +264,16 @@ public boolean scheduleIfNotExists(SchedulableInstance schedulableInstanc return this.delegate.scheduleIfNotExists(schedulableInstance); } + @Override + public void scheduleBatch(List> taskInstances, Instant executionTime) { + this.delegate.scheduleBatch(taskInstances, executionTime); + } + + @Override + public void scheduleBatch(List> schedulableInstances) { + this.delegate.scheduleBatch(schedulableInstances); + } + @Override public void schedule(TaskInstance taskInstance, Instant executionTime) { this.delegate.schedule(taskInstance, executionTime); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java index 17c8e491..531c4e2c 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java @@ -14,6 +14,7 @@ package com.github.kagkarlsson.scheduler; import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.toList; import com.github.kagkarlsson.scheduler.SchedulerClient.ScheduleOptions.WhenExists; import com.github.kagkarlsson.scheduler.event.SchedulerListeners; @@ -26,6 +27,7 @@ import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.ScheduledTaskInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstanceId; @@ -35,12 +37,15 @@ import java.util.List; import java.util.Optional; import java.util.function.Consumer; +import java.util.stream.Stream; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public interface SchedulerClient { + int DEFAULT_BATCH_SIZE = 100; + /** * Schedule a new execution for the given task instance. * @@ -116,6 +121,51 @@ boolean schedule( */ boolean scheduleIfNotExists(SchedulableInstance schedulableInstance); + /** + * Schedule a batch of executions. If any of the executions already exists, the scheduling will + * fail and an exception will be thrown. + * + * @param taskInstances Task-instance to schedule, optionally with data + * @param executionTime Instant it should run + * @see java.time.Instant + * @see com.github.kagkarlsson.scheduler.task.TaskInstance + */ + void scheduleBatch(List> taskInstances, Instant executionTime); + + /** + * Schedule a batch of executions. If any of the executions already exists, the scheduling will + * fail and an exception will be thrown. + * + * @param schedulableInstances Task-instances with individual execution-times + * @see com.github.kagkarlsson.scheduler.task.SchedulableInstance + */ + void scheduleBatch(List> schedulableInstances); + + /** + * Schedule a batch of executions. If any of the executions already exists, the scheduling will + * fail and an exception will be thrown. + * + * @param taskInstances Task-instances to schedule, optionally with data + * @param executionTime Instant it should run + * @see java.time.Instant + * @see com.github.kagkarlsson.scheduler.task.TaskInstance + */ + default void scheduleBatch(Stream> taskInstances, Instant executionTime) { + StreamUtils.chunkStream(taskInstances, DEFAULT_BATCH_SIZE) + .forEach(chunk -> scheduleBatch(chunk, executionTime)); + } + + /** + * Schedule a batch of executions. If any of the executions already exists, the scheduling will + * fail and an exception will be thrown. + * + * @param schedulableInstances Task-instances with individual execution-times + * @see com.github.kagkarlsson.scheduler.task.SchedulableInstance + */ + default void scheduleBatch(Stream> schedulableInstances) { + StreamUtils.chunkStream(schedulableInstances, DEFAULT_BATCH_SIZE).forEach(this::scheduleBatch); + } + /** * Update an existing execution to a new execution-time. If the execution does not exist or if it * is currently running, an exception is thrown. @@ -389,6 +439,34 @@ public boolean scheduleIfNotExists(SchedulableInstance schedulableInstanc schedulableInstance.getNextExecutionTime(clock.now())); } + @Override + public void scheduleBatch(List> taskInstances, Instant executionTime) { + List batchToSchedule = + taskInstances.stream() + .map(taskInstance -> new ScheduledTaskInstance(taskInstance, executionTime)) + .collect(toList()); + + taskRepository.createBatch(batchToSchedule); + notifyListenersOfScheduledBatch(batchToSchedule); + } + + @Override + public void scheduleBatch(List> schedulableInstances) { + List batchToSchedule = + schedulableInstances.stream() + .map(schedulable -> ScheduledTaskInstance.fixExecutionTime(schedulable, clock)) + .collect(toList()); + + taskRepository.createBatch(batchToSchedule); + notifyListenersOfScheduledBatch(batchToSchedule); + } + + private void notifyListenersOfScheduledBatch(List batchToSchedule) { + batchToSchedule.forEach( + instance -> + schedulerListeners.onExecutionScheduled(instance, instance.getExecutionTime())); + } + @Override public void schedule(SchedulableInstance schedulableInstance) { schedule( diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/StreamUtils.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/StreamUtils.java new file mode 100644 index 00000000..d38b4111 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/StreamUtils.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; + +public class StreamUtils { + + public static Stream> chunkStream(Stream stream, int chunkSize) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("Chunk size must be greater than 0"); + } + + Iterator iterator = stream.iterator(); + return Stream.generate( + () -> { + List chunk = new ArrayList<>(); + for (int i = 0; i < chunkSize && iterator.hasNext(); i++) { + chunk.add(iterator.next()); + } + return chunk; + }) + .takeWhile(chunk -> !chunk.isEmpty()); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java index ea22a803..ff45e84d 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.ScheduledTaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstanceId; import java.time.Duration; import java.time.Instant; @@ -24,11 +25,21 @@ public interface TaskRepository { - boolean createIfNotExists(SchedulableInstance execution); + /** Prefer ScheduledTaskInstance which has a fixed execution-time */ + @Deprecated + boolean createIfNotExists(SchedulableInstance execution); + + boolean createIfNotExists(ScheduledTaskInstance execution); List getDue(Instant now, int limit); - Instant replace(Execution toBeReplaced, SchedulableInstance newInstance); + void createBatch(List executions); + + Instant replace(Execution toBeReplaced, ScheduledTaskInstance newInstance); + + /** Prefer ScheduledTaskInstance which has a fixed execution-time */ + @Deprecated + Instant replace(Execution toBeReplaced, SchedulableInstance newInstance); void getScheduledExecutions(ScheduledExecutionsFilter filter, Consumer consumer); @@ -71,7 +82,6 @@ boolean reschedule( default Optional getExecution(TaskInstanceId taskInstance) { return getExecution(taskInstance.getTaskName(), taskInstance.getId()); } - ; int removeExecutions(String taskName); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DbSchedulerException.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DbSchedulerException.java index bb50165e..4b535823 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DbSchedulerException.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DbSchedulerException.java @@ -14,7 +14,7 @@ package com.github.kagkarlsson.scheduler.exceptions; public abstract class DbSchedulerException extends RuntimeException { - static final long serialVersionUID = -2132850112553296790L; + private static final long serialVersionUID = -2132850112553296790L; public DbSchedulerException(String message) { super(message); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/FailedToScheduleBatchException.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/FailedToScheduleBatchException.java new file mode 100644 index 00000000..e18346ac --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/FailedToScheduleBatchException.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.exceptions; + +public class FailedToScheduleBatchException extends DbSchedulerException { + private static final long serialVersionUID = -2132850112553296792L; + + public FailedToScheduleBatchException(String message) { + super(message); + } + + public FailedToScheduleBatchException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/TaskInstanceException.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/TaskInstanceException.java index e315482d..8c706f93 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/TaskInstanceException.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/TaskInstanceException.java @@ -14,7 +14,7 @@ package com.github.kagkarlsson.scheduler.exceptions; public class TaskInstanceException extends DbSchedulerException { - static final long serialVersionUID = -2132850112553296790L; + private static final long serialVersionUID = -2132850112553296791L; private static final String TASK_NAME_INSTANCE_MESSAGE_PART = " (task name: %s, instance id: %s)"; private final String taskName; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java index f9791e38..eaf0284a 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java @@ -28,10 +28,12 @@ import com.github.kagkarlsson.scheduler.TaskResolver; import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; import com.github.kagkarlsson.scheduler.exceptions.ExecutionException; +import com.github.kagkarlsson.scheduler.exceptions.FailedToScheduleBatchException; import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException; import com.github.kagkarlsson.scheduler.serializer.Serializer; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.ScheduledTaskInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import java.sql.PreparedStatement; @@ -144,11 +146,21 @@ protected JdbcTaskRepository( this.jdbcCustomization = jdbcCustomization; this.orderByPriority = orderByPriority; this.clock = clock; + this.insertQuery = getInsertQuery(tableName); } + private final String insertQuery; + @Override - @SuppressWarnings({"unchecked"}) + @SuppressWarnings("unchecked") public boolean createIfNotExists(SchedulableInstance instance) { + return createIfNotExists( + new ScheduledTaskInstance( + instance.getTaskInstance(), instance.getNextExecutionTime(clock.now()))); + } + + @Override + public boolean createIfNotExists(ScheduledTaskInstance instance) { final TaskInstance taskInstance = instance.getTaskInstance(); try { Optional existingExecution = getExecution(taskInstance); @@ -159,36 +171,13 @@ public boolean createIfNotExists(SchedulableInstance instance) { return false; } - // FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc - // with execute(query-and-pss) and have builder return that.. - jdbcRunner.execute( - "insert into " - + tableName - + "(task_name, task_instance, task_data, execution_time, picked, version" - + (orderByPriority ? ", priority" : "") - + ") values(?, ?, ?, ?, ?, ? " - + (orderByPriority ? ", ?" : "") - + ")", - (PreparedStatement p) -> { - int parameterIndex = 1; - p.setString(parameterIndex++, taskInstance.getTaskName()); - p.setString(parameterIndex++, taskInstance.getId()); - jdbcCustomization.setTaskData( - p, parameterIndex++, serializer.serialize(taskInstance.getData())); - jdbcCustomization.setInstant( - p, parameterIndex++, instance.getNextExecutionTime(clock.now())); - p.setBoolean(parameterIndex++, false); - p.setLong(parameterIndex++, 1L); - if (orderByPriority) { - p.setInt(parameterIndex++, taskInstance.getPriority()); - } - }); + jdbcRunner.execute(insertQuery, (PreparedStatement p) -> setInsertParameters(instance, p)); return true; } catch (SQLRuntimeException e) { LOG.debug("Exception when inserting execution. Assuming it to be a constraint violation.", e); Optional existingExecution = getExecution(taskInstance); - if (!existingExecution.isPresent()) { + if (existingExecution.isEmpty()) { throw new TaskInstanceException( "Failed to add new execution.", instance.getTaskName(), instance.getId(), e); } @@ -197,14 +186,60 @@ public boolean createIfNotExists(SchedulableInstance instance) { } } + @Override + public void createBatch(List executions) { + try { + jdbcRunner.executeBatch(insertQuery, executions, this::setInsertParameters); + } catch (SQLRuntimeException e) { + LOG.debug("Failed to create all executions. Some might already exist.", e); + throw new FailedToScheduleBatchException("Failed to create all executions.", e); + } + } + + private void setInsertParameters(ScheduledTaskInstance value, PreparedStatement ps) + throws SQLException { + var taskInstance = value.getTaskInstance(); + int index = 1; + ps.setString(index++, taskInstance.getTaskName()); + ps.setString(index++, taskInstance.getId()); + jdbcCustomization.setTaskData(ps, index++, serializer.serialize(taskInstance.getData())); + jdbcCustomization.setInstant(ps, index++, value.getExecutionTime()); + ps.setBoolean(index++, false); + ps.setLong(index++, 1L); + if (orderByPriority) { + ps.setInt(index, taskInstance.getPriority()); + } + } + + private String getInsertQuery(String tableName) { + // FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc + // with execute(query-and-pss) and have builder return that.. + return "insert into " + + tableName + + "(task_name, task_instance, task_data, execution_time, picked, version" + + (orderByPriority ? ", priority" : "") + + ") values(?, ?, ?, ?, ?, ? " + + (orderByPriority ? ", ?" : "") + + ")"; + } + + @Override + @SuppressWarnings("unchecked") + public Instant replace(Execution toBeReplaced, SchedulableInstance newInstance) { + return replace( + toBeReplaced, + new ScheduledTaskInstance( + newInstance.getTaskInstance(), newInstance.getNextExecutionTime(clock.now()))); + } + /** * Instead of doing delete+insert, we allow updating an existing execution will all new fields * * @return the execution-time of the new execution */ @Override - public Instant replace(Execution toBeReplaced, SchedulableInstance newInstance) { - Instant newExecutionTime = newInstance.getNextExecutionTime(clock.now()); + public Instant replace(Execution toBeReplaced, ScheduledTaskInstance newInstance) { + Instant newExecutionTime = newInstance.getExecutionTime(); Execution newExecution = new Execution(newExecutionTime, newInstance.getTaskInstance()); Object newData = newInstance.getTaskInstance().getData(); @@ -243,7 +278,7 @@ public Instant replace(Execution toBeReplaced, SchedulableInstance newInstance) ps, index++, serializer.serialize(newData)); // task_data ps.setString(index++, toBeReplaced.taskInstance.getTaskName()); // task_name ps.setString(index++, toBeReplaced.taskInstance.getId()); // task_instance - ps.setLong(index++, toBeReplaced.version); // version + ps.setLong(index, toBeReplaced.version); // version }); if (updated == 0) { @@ -251,13 +286,10 @@ public Instant replace(Execution toBeReplaced, SchedulableInstance newInstance) "Failed to replace execution, found none matching " + toBeReplaced); } else if (updated > 1) { LOG.error( - "Expected one execution to be updated, but updated " - + updated - + ". Indicates a bug. " - + "Replaced " - + toBeReplaced.taskInstance - + " with " - + newExecution.taskInstance); + "Expected one execution to be updated, but updated {}. Indicates a bug. Replaced {} with {}", + updated, + toBeReplaced.taskInstance, + newExecution.taskInstance); } return newExecutionTime; } @@ -303,18 +335,7 @@ public List getDue(Instant now, int limit) { jdbcCustomization.createSelectDueQuery( tableName, limit, unresolvedFilter.andCondition(), orderByPriority); - return jdbcRunner.query( - selectDueQuery, - (PreparedStatement p) -> { - int index = 1; - p.setBoolean(index++, false); - jdbcCustomization.setInstant(p, index++, now); - unresolvedFilter.setParameters(p, index); - if (!jdbcCustomization.supportsExplicitQueryLimitPart()) { - p.setMaxRows(limit); - } - }, - new ExecutionResultSetMapper(false, true)); + return getExecutions(jdbcRunner, selectDueQuery, now, limit, unresolvedFilter); } @Override @@ -327,20 +348,9 @@ public List lockAndFetchGeneric(Instant now, int limit) { jdbcCustomization.createGenericSelectForUpdateQuery( tableName, limit, unresolvedFilter.andCondition(), orderByPriority); List candidates = - txRunner.query( - selectForUpdateQuery, - (PreparedStatement p) -> { - int index = 1; - p.setBoolean(index++, false); - jdbcCustomization.setInstant(p, index++, now); - unresolvedFilter.setParameters(p, index); - if (!jdbcCustomization.supportsExplicitQueryLimitPart()) { - p.setMaxRows(limit); - } - }, - new ExecutionResultSetMapper(false, true)); - - if (candidates.size() == 0) { + getExecutions(txRunner, selectForUpdateQuery, now, limit, unresolvedFilter); + + if (candidates.isEmpty()) { return new ArrayList<>(); } @@ -368,12 +378,9 @@ public List lockAndFetchGeneric(Instant now, int limit) { if (totalUpdated != candidates.size()) { LOG.error( - "Did not update same amount of executions that were locked in the transaction. " - + "This might mean some assumption is wrong here, or that transaction is not working. " - + "Needs to be investigated. Updated: " - + totalUpdated - + ", expected: " - + candidates.size()); + "Did not update same amount of executions that were locked in the transaction. This might mean some assumption is wrong here, or that transaction is not working. Needs to be investigated. Updated: {}, expected: {}", + totalUpdated, + candidates.size()); List locked = new ArrayList<>(); List noLock = new ArrayList<>(); @@ -390,8 +397,8 @@ public List lockAndFetchGeneric(Instant now, int limit) { String instancesNotLocked = noLock.stream().map(e -> e.taskInstance.toString()).collect(joining(",")); LOG.warn( - "Returning picked executions for processing. Did not manage to pick executions: " - + instancesNotLocked); + "Returning picked executions for processing. Did not manage to pick executions: {}", + instancesNotLocked); return updateToPicked(locked, pickedBy, lastHeartbeat); } else { @@ -400,6 +407,26 @@ public List lockAndFetchGeneric(Instant now, int limit) { }); } + private List getExecutions( + JdbcRunner jdbcRunner, + String query, + Instant now, + int limit, + UnresolvedFilter unresolvedFilter) { + return jdbcRunner.query( + query, + (PreparedStatement p) -> { + int index = 1; + p.setBoolean(index++, false); + jdbcCustomization.setInstant(p, index++, now); + unresolvedFilter.setParameters(p, index); + if (!jdbcCustomization.supportsExplicitQueryLimitPart()) { + p.setMaxRows(limit); + } + }, + new ExecutionResultSetMapper(false, true)); + } + private List updateToPicked( List executions, String pickedBy, Instant lastHeartbeat) { return executions.stream() @@ -502,8 +529,8 @@ private boolean rescheduleInternal( ps.setBoolean(index++, false); ps.setString(index++, null); jdbcCustomization.setInstant(ps, index++, null); - jdbcCustomization.setInstant(ps, index++, ofNullable(lastSuccess).orElse(null)); - jdbcCustomization.setInstant(ps, index++, ofNullable(lastFailure).orElse(null)); + jdbcCustomization.setInstant(ps, index++, lastSuccess); + jdbcCustomization.setInstant(ps, index++, lastFailure); ps.setInt(index++, consecutiveFailures); jdbcCustomization.setInstant(ps, index++, nextExecutionTime); if (newData != null) { @@ -513,7 +540,7 @@ private boolean rescheduleInternal( } ps.setString(index++, execution.taskInstance.getTaskName()); ps.setString(index++, execution.taskInstance.getId()); - ps.setLong(index++, execution.version); + ps.setLong(index, execution.version); }); if (updated != 1) { @@ -521,11 +548,10 @@ private boolean rescheduleInternal( "Expected one execution to be updated, but updated " + updated + ". Indicates a bug.", execution); } - return updated > 0; + return true; } @Override - @SuppressWarnings({"unchecked"}) public Optional pick(Execution e, Instant timePicked) { final int updated = jdbcRunner.execute( @@ -547,11 +573,11 @@ public Optional pick(Execution e, Instant timePicked) { }); if (updated == 0) { - LOG.trace("Failed to pick execution. It must have been picked by another scheduler.", e); + LOG.trace("Failed to pick execution. It must have been picked by another scheduler. {}", e); return Optional.empty(); } else if (updated == 1) { final Optional pickedExecution = getExecution(e.taskInstance); - if (!pickedExecution.isPresent()) { + if (pickedExecution.isEmpty()) { throw new IllegalStateException( "Unable to find picked execution. Must have been deleted by another thread. Indicates a bug."); } else if (!pickedExecution.get().isPicked()) { @@ -631,12 +657,11 @@ public boolean updateHeartbeat(Execution e, Instant newHeartbeat) { } else { if (updated > 1) { LOG.error( - "Updated multiple rows updating heartbeat for execution. Should never happen since " - + "name and id is primary key. Execution: " - + e); + "Updated multiple rows updating heartbeat for execution. Should never happen since name and id is primary key. Execution: {}", + e); return true; } - LOG.debug("Updated heartbeat for execution: " + e); + LOG.debug("Updated heartbeat for execution: {}", e); return true; } } @@ -686,9 +711,7 @@ public Optional getExecution(String taskName, String taskInstanceId) public int removeExecutions(String taskName) { return jdbcRunner.execute( "delete from " + tableName + " where task_name = ?", - (PreparedStatement p) -> { - p.setString(1, taskName); - }); + (PreparedStatement p) -> p.setString(1, taskName)); } @Override @@ -714,12 +737,7 @@ private JdbcTaskRepositoryContext getTaskRespositoryContext() { private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter, boolean orderByPriority) { final QueryBuilder q = QueryBuilder.selectFromTable(tableName); - filter - .getPickedValue() - .ifPresent( - value -> { - q.andCondition(new PickedCondition(value)); - }); + filter.getPickedValue().ifPresent(value -> q.andCondition(new PickedCondition(value))); q.orderBy(orderByPriority ? "priority desc, execution_time asc" : "execution_time asc"); return q; @@ -751,7 +769,7 @@ private class ExecutionResultSetConsumer implements ResultSetMapper { private final Consumer consumer; private final boolean includeUnresolved; - private boolean addUnresolvedToExclusionFilter; + private final boolean addUnresolvedToExclusionFilter; private ExecutionResultSetConsumer(Consumer consumer) { this(consumer, false, true); @@ -773,7 +791,7 @@ public Void map(ResultSet rs) throws SQLException { String taskName = rs.getString("task_name"); Optional task = taskResolver.resolve(taskName, addUnresolvedToExclusionFilter); - if (!task.isPresent() && !includeUnresolved) { + if (task.isEmpty() && !includeUnresolved) { if (addUnresolvedToExclusionFilter) { LOG.warn( "Failed to find implementation for task with name '{}'. Execution will be excluded from due. " @@ -804,7 +822,7 @@ public Void map(ResultSet rs) throws SQLException { Supplier dataSupplier = memoize( () -> { - if (!task.isPresent()) { + if (task.isEmpty()) { // return the data raw if the type is not known // a case for standalone clients, with no "known tasks" return data; @@ -829,7 +847,7 @@ public Void map(ResultSet rs) throws SQLException { } private static Supplier memoize(Supplier original) { - return new Supplier() { + return new Supplier<>() { boolean initialized; public T get() { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ScheduledTaskInstance.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ScheduledTaskInstance.java new file mode 100644 index 00000000..e878c423 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ScheduledTaskInstance.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import com.github.kagkarlsson.scheduler.Clock; +import java.time.Instant; + +public class ScheduledTaskInstance implements TaskInstanceId { + private final TaskInstance taskInstance; + private final Instant executionTime; + + public ScheduledTaskInstance(TaskInstance taskInstance, Instant executionTime) { + this.taskInstance = taskInstance; + this.executionTime = executionTime; + } + + public static ScheduledTaskInstance fixExecutionTime( + SchedulableInstance schedulableInstance, Clock clock) { + return new ScheduledTaskInstance( + schedulableInstance.getTaskInstance(), + schedulableInstance.getNextExecutionTime(clock.now())); + } + + public TaskInstance getTaskInstance() { + return taskInstance; + } + + public Instant getExecutionTime() { + return executionTime; + } + + @Override + public String getTaskName() { + return taskInstance.getTaskName(); + } + + @Override + public String getId() { + return taskInstance.getId(); + } +} diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java index f80888a8..cb703caf 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java @@ -12,9 +12,11 @@ import static org.hamcrest.collection.IsCollectionWithSize.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import co.unruly.matchers.OptionalMatchers; +import com.github.kagkarlsson.scheduler.exceptions.FailedToScheduleBatchException; import com.github.kagkarlsson.scheduler.helper.TestableRegistry; import com.github.kagkarlsson.scheduler.helper.TimeHelper; import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; @@ -37,7 +39,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -@SuppressWarnings("unchecked") public class JdbcTaskRepositoryTest { public static final String SCHEDULER_NAME = "scheduler1"; @@ -80,6 +81,35 @@ public void test_createIfNotExists() { assertTrue(taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance2, now))); } + @Test + public void test_createBatch() { + Instant now = TimeHelper.truncatedInstantNow(); + TaskInstance instance1 = oneTimeTask.instance("id1"); + TaskInstance instance2 = oneTimeTask.instance("id2"); + List executions = + List.of( + new ScheduledTaskInstance(instance1, now), new ScheduledTaskInstance(instance2, now)); + + taskRepository.createBatch(executions); + + assertTrue(taskRepository.getExecution(instance1).isPresent()); + assertTrue(taskRepository.getExecution(instance2).isPresent()); + } + + @Test + public void test_createBatch_fails_if_any_execution_already_exists() { + Instant now = TimeHelper.truncatedInstantNow(); + TaskInstance instance1 = oneTimeTask.instance("id1"); + TaskInstance instance2 = oneTimeTask.instance("id2"); + List executions = + List.of( + new ScheduledTaskInstance(instance1, now), new ScheduledTaskInstance(instance2, now)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance2, now)); + + assertThrows( + FailedToScheduleBatchException.class, () -> taskRepository.createBatch(executions)); + } + @Test public void test_replace() { Instant now = TimeHelper.truncatedInstantNow(); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java index 25ca073a..8ed56664 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -99,6 +100,17 @@ public void client_should_be_able_to_schedule_executions() { assertThat(onetimeTaskHandlerA.timesExecuted.get(), CoreMatchers.is(2)); } + @Test + public void client_should_be_able_to_schedule_batch_executions() { + SchedulerClient client = create(DB.getDataSource()).build(); + + client.scheduleBatch( + Stream.of(oneTimeTaskA.instance("1"), oneTimeTaskA.instance("2")), settableClock.now()); + scheduler.runAnyDueExecutions(); + + assertThat(onetimeTaskHandlerA.timesExecuted.get(), CoreMatchers.is(2)); + } + @Test public void should_be_able_to_schedule_other_executions_from_an_executionhandler() { scheduler.schedule(scheduleAnotherTask.instance("1"), settableClock.now()); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java index 5be2450d..ad84eae6 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java @@ -21,6 +21,7 @@ import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Instant; +import java.util.stream.Stream; import javax.sql.DataSource; public class SchedulerClientMain extends Example { @@ -52,6 +53,13 @@ public void run(DataSource dataSource) { MY_TASK.instance("id" + i).data(i).scheduledTo(now.plusSeconds(i))); } + clientWithTypeInformation.scheduleBatch( + Stream.of( + MY_TASK.instance("batch-id-1").data(1).build(), + MY_TASK.instance("batch-id-2").data(2).build(), + MY_TASK.instance("batch-id-3").data(3).build()), + now.plusSeconds(8)); + System.out.println("Listing scheduled executions"); clientWithTypeInformation .getScheduledExecutions(ScheduledExecutionsFilter.all()) diff --git a/jreleaser.yml b/jreleaser.yml index f03e9575..ca6f2bc0 100644 --- a/jreleaser.yml +++ b/jreleaser.yml @@ -17,7 +17,7 @@ release: owner: kagkarlsson overwrite: false skipRelease: false - draft: true + draft: false skipTag: false releaseName: '{{tagName}}' changelog: