Skip to content

Commit

Permalink
Allow the periodic task to take in task properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
9aman committed Jan 24, 2025
1 parent 16ed5e4 commit 18501e3
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
Expand Down Expand Up @@ -103,6 +105,43 @@ public Response runPeriodicTask(
.build();
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("/run")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.EXECUTE_TASK)
@ApiOperation(value = "Run periodic task against table with custom properties. If table name is missing, task will "
+ "run against all tables.")
public Response runPeriodicTaskWithProperties(
@ApiParam(value = "Periodic task name", required = true) @QueryParam("taskname") String periodicTaskName,
@ApiParam(value = "Name of the table") @QueryParam("tableName") String tableName,
@ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String tableType,
@ApiParam(value = "Task properties") Map<String, String> taskProperties,
@Context HttpHeaders headers) {

if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
throw new WebApplicationException("Periodic task '" + periodicTaskName + "' not found.",
Response.Status.NOT_FOUND);
}

if (tableName != null) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
List<String> matchingTableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName,
Constants.validateTableType(tableType), LOGGER);

if (matchingTableNamesWithType.size() > 1) {
throw new WebApplicationException(
"More than one table matches Table '" + tableName + "'. Matching names: " + matchingTableNamesWithType);
}

tableName = matchingTableNamesWithType.get(0);
}

return Response.ok()
.entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName, periodicTaskName, taskProperties))
.build();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/names")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
private long _lastSegmentLevelValidationRunTimeMs = 0L;

public static final String OFFSET_CRITERIA = "offsetCriteria";
public static final String RUN_SEGMENT_LEVEL_VALIDATION = "runSegmentLevelValidation";

public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
Expand All @@ -81,8 +83,7 @@ protected Context preprocess(Properties periodicTaskProperties) {
Context context = new Context();
// Run segment level validation only if certain time has passed after previous run
long currentTimeMs = System.currentTimeMillis();
if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationRunTimeMs)
>= _segmentLevelValidationIntervalInSeconds) {
if (shouldRunSegmentValidation(periodicTaskProperties, currentTimeMs)) {
LOGGER.info("Run segment-level validation");
context._runSegmentLevelValidation = true;
_lastSegmentLevelValidationRunTimeMs = currentTimeMs;
Expand Down Expand Up @@ -177,6 +178,24 @@ private void runSegmentLevelValidation(TableConfig tableConfig) {
}
}

private boolean shouldRunSegmentValidation(Properties periodicTaskProperties, long currentTimeMs) {
boolean runValidation = Optional.ofNullable(
periodicTaskProperties.getProperty(RUN_SEGMENT_LEVEL_VALIDATION))
.map(value -> {
try {
return Boolean.parseBoolean(value);
} catch (Exception e) {
return false;
}
})
.orElse(false);

boolean timeThresholdMet = TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationRunTimeMs)
>= _segmentLevelValidationIntervalInSeconds;

return runValidation || timeThresholdMet;
}

@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
for (String tableNameWithType : tableNamesWithType) {
Expand Down

0 comments on commit 18501e3

Please sign in to comment.