Skip to content

Commit

Permalink
Merge pull request #770 from conveyal/staging-2021-12
Browse files Browse the repository at this point in the history
Staging 2021-12
  • Loading branch information
trevorgerhardt authored Jan 10, 2022
2 parents 6621980 + 0136c41 commit 83abf27
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 113 deletions.
3 changes: 0 additions & 3 deletions src/main/java/com/conveyal/analysis/WorkerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public abstract class WorkerConfig extends ConfigBase implements TaskScheduler.C
private final String brokerPort;
private final int lightThreads;
private final int heavyThreads;
private final boolean testTaskRedelivery;
private final boolean listenForSinglePoint;

// CONSTRUCTORS
Expand All @@ -34,7 +33,6 @@ protected WorkerConfig (Properties props) {
lightThreads = availableProcessors;
heavyThreads = availableProcessors;
}
testTaskRedelivery = boolProp("test-task-redelivery");
listenForSinglePoint = boolProp("listen-for-single-point");
// No call to exitIfErrors() here, that should be done in concrete subclasses.
}
Expand All @@ -47,7 +45,6 @@ protected WorkerConfig (Properties props) {
@Override public String brokerPort() { return brokerPort; }
@Override public int lightThreads () { return lightThreads; }
@Override public int heavyThreads () { return heavyThreads; }
@Override public boolean testTaskRedelivery() { return testTaskRedelivery; }
@Override public boolean listenForSinglePoint() { return listenForSinglePoint; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ public class LocalWorkerLauncher implements WorkerLauncher {

private static final Logger LOG = LoggerFactory.getLogger(LocalWorkerLauncher.class);
private static final int N_WORKERS_LOCAL = 1;
private static final int N_WORKERS_LOCAL_TESTING = 4;

public interface Config {
int serverPort ();
String localCacheDirectory ();
boolean testTaskRedelivery();
}

private final TransportNetworkCache transportNetworkCache;
Expand All @@ -47,22 +45,11 @@ public LocalWorkerLauncher (Config config, FileStorage fileStorage, GTFSCache gt
workerConfig.setProperty("broker-address", "localhost");
workerConfig.setProperty("broker-port", Integer.toString(config.serverPort()));
workerConfig.setProperty("cache-dir", config.localCacheDirectory());
workerConfig.setProperty("test-task-redelivery", "false");


// From a throughput perspective there is no point in running more than one worker locally, since each worker
// has at least as many threads as there are processor cores. But for testing purposes (e.g. testing that task
// redelivery works right) we may want to start more workers to simulate running on a cluster.
if (config.testTaskRedelivery()) {
// When testing we want multiple workers. Below, all but one will have single point listening disabled
// to allow them to run on the same machine without port conflicts.
nWorkers = N_WORKERS_LOCAL_TESTING;
// Tell the workers to return fake results, but fail part of the time.
workerConfig.setProperty("test-task-redelivery", "true");
} else {
nWorkers = N_WORKERS_LOCAL;
}

nWorkers = N_WORKERS_LOCAL;
}

@Override
Expand Down
68 changes: 34 additions & 34 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,11 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
LOG.error("Failed to mark task {} completed on job {}.", taskId, job.jobId);
}
// Once the last task is marked as completed, the job is finished.
// Purge it from the list to free memory.
// Remove it and its associated result assembler from the maps.
// The caller should already have a reference to the result assembler so it can process the final results.
if (job.isComplete()) {
job.verifyComplete();
jobs.remove(job.workerCategory, job);
// This method is called after the regional work results are handled, finishing and closing the local file.
// So we can harmlessly remove the MultiOriginAssembler now that the job is removed.
resultAssemblers.remove(job.jobId);
eventBus.send(new RegionalAnalysisEvent(job.jobId, COMPLETED).forUser(job.workerTags.user, job.workerTags.group));
}
Expand All @@ -345,7 +344,9 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
job.errors.add(error);
if (job != null) {
job.errors.add(error);
}
}

/**
Expand Down Expand Up @@ -431,41 +432,40 @@ public void recordWorkerObservation(WorkerStatus workerStatus) {
}

/**
* Slots a single regional work result received from a worker into the appropriate position in
* the appropriate file. Also considers requesting extra spot instances after a few results have
* been received. The checks in place should prevent an unduly large number of workers from
* proliferating, assuming jobs for a given worker category (transport network + R5 version) are
* completed sequentially.
*
* @param workResult an object representing accessibility results for a single origin point,
* sent by a worker.
* Slots a single regional work result received from a worker into the appropriate position in the appropriate
* files. Also considers requesting extra spot instances after a few results have been received.
* @param workResult an object representing accessibility results for a single origin point, sent by a worker.
*/
public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Retrieving the job and assembler from their maps is not threadsafe, so we do so in a
// synchronized block here. Once the job is retrieved, it can be used to
// requestExtraWorkers below without synchronization, because that method only uses final
// fields of the job.
Job job;
// Retrieving the job and assembler from their maps is not thread safe, so we use synchronized block here.
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
assembler = resultAssemblers.get(workResult.jobId);
}
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks when a job is deleted by the user or after it errors.
LOG.debug("Received result for unrecognized, deleted, or inactive job ID {}, discarding.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass the (bad) result on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// When the last task is received, this will build up to 5 grids and upload them to S3. That should probably
// not be done synchronously in an HTTP handler called by the worker (likewise for starting workers below).
try {
synchronized (this) {
job = findJob(workResult.jobId);
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
markTaskCompleted(job, workResult.taskId);
}
// Unlike everything above, result assembly (like starting workers below) does not synchronize on the broker.
// It contains some slow nested operations to move completed results into storage. Really we should not do
// these things synchronously in an HTTP handler called by the worker. We should probably synchronize this
// entire method, then somehow enqueue slower async completion and cleanup tasks in the caller.
assembler.handleMessage(workResult);
markTaskCompleted(job, workResult.taskId);
} catch (Throwable t) {
recordJobError(job, ExceptionUtils.stackTraceString(t));
eventBus.send(new ErrorEvent(t));
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/conveyal/analysis/models/AnalysisRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.r5.analyst.WebMercatorExtents;
import com.conveyal.r5.analyst.cluster.AnalysisWorkerTask;
import com.conveyal.r5.analyst.cluster.ChaosParameters;
import com.conveyal.r5.analyst.decay.DecayFunction;
import com.conveyal.r5.analyst.decay.StepDecayFunction;
import com.conveyal.r5.analyst.fare.InRoutingFareCalculator;
Expand Down Expand Up @@ -157,6 +158,12 @@ public class AnalysisRequest {
*/
public DecayFunction decayFunction;

/**
* If this field is non-null, it will intentionally cause failures on workers handling the task. This is done on
* testing or even production systems in order to observe and improve their robustness to failure.
*/
public ChaosParameters injectFault;

/**
* Create the R5 `Scenario` from this request.
*/
Expand Down Expand Up @@ -254,6 +261,14 @@ public void populateTask (AnalysisWorkerTask task, UserPermissions userPermissio
if (task.decayFunction == null) {
task.decayFunction = new StepDecayFunction();
}
// Intentionally introduce errors for testing purposes, but only for admin users.
if (injectFault != null) {
if (userPermissions.admin) {
task.injectFault = injectFault;
} else {
throw new IllegalArgumentException("Must be admin user to inject faults.");
}
}
}

private static void checkGridSize (WebMercatorExtents extents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOExcepti

@Override
synchronized void terminate () throws IOException {
bufferFile.delete();
randomAccessFile.close();
bufferFile.delete();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
if (job.templateTask.originPointSet != null) {
resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage));
} else {
resultWriters.add( new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage));
resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage));
}
}

Expand Down Expand Up @@ -161,12 +161,12 @@ private synchronized void finish() {
}

/**
* There is a bit of logic in this method that wouldn't strictly need to be synchronized (the
* dimension checks) but those should take a trivial amount of time. For safety and simplicity
* we will synchronize the whole method. The downside is that this prevents one thread from
* writing accessibility while another was writing travel time CSV, but this should not be
* assumed to have any impact on performance unless measured. The writeOneValue methods are also synchronized
* for good measure. There should be no additional cost to retaining the lock when entering those methods.
* There is a bit of logic in this method that wouldn't strictly need to be synchronized (the dimension checks) but
* those should take a trivial amount of time. For safety and simplicity we synchronize the whole method. The
* downside is that this prevents one thread from writing accessibility while another was writing travel time CSV,
* but this should not be assumed to have any impact on performance unless measured. The writeOneValue methods on
* this class are also synchronized for good measure. There should be no additional cost to retaining the lock when
* entering those methods.
*/
public synchronized void handleMessage (RegionalWorkResult workResult) throws Exception {
for (RegionalResultWriter writer : resultWriters) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/conveyal/gtfs/model/Route.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public void loadOneRow() throws IOException {
r.route_short_name = getStringField("route_short_name", false); // one or the other required, needs a special validator
r.route_long_name = getStringField("route_long_name", false);
r.route_desc = getStringField("route_desc", false);
r.route_type = getIntField("route_type", true, 0, 7);
// Original range is 0 to 7, expanded to include 11 and 12, plus extended TPEG codes from 100 to 1500.
// See com.conveyal.r5.transit.TransitLayer.getTransitModes
r.route_type = getIntField("route_type", true, 0, 1500);
r.route_url = getUrlField("route_url", false);
r.route_color = getStringField("route_color", false);
r.route_text_color = getStringField("route_text_color", false);
Expand Down
40 changes: 5 additions & 35 deletions src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,15 @@ public class AnalysisWorker implements Runnable {
* This implies too much functionality is concentrated in AnalysisWorker and should be compartmentalized.
*/
public interface Config {

/**
* This worker will only listen for incoming single point requests if this field is true when run() is invoked.
* Setting this to false before running creates a regional-only cluster worker.
* This is useful in testing when running many workers on the same machine.
*/
boolean listenForSinglePoint();

/**
* If this is true, the worker will not actually do any work. It will just report all tasks as completed
* after a small delay, but will fail to do so on the given percentage of tasks. This is used in testing task
* re-delivery and overall broker sanity with multiple jobs and multiple failing workers.
*/
boolean testTaskRedelivery();
String brokerAddress();
String brokerPort();
String initialGraphId();

}

// CONSTANTS
Expand Down Expand Up @@ -416,11 +407,11 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable {

LOG.debug("Handling regional task {}", task.toString());

// If this worker is being used in a test of the task redelivery mechanism. Report most work as completed
// without actually doing anything, but fail to report results a certain percentage of the time.
if (config.testTaskRedelivery()) {
pretendToDoWork(task);
return;
if (task.injectFault != null) {
task.injectFault.considerShutdownOrException(task.taskId);
if (task.injectFault.shouldDropTaskBeforeCompute(task.taskId)) {
return;
}
}

// Ensure we don't try to calculate accessibility to missing opportunity data points.
Expand Down Expand Up @@ -517,27 +508,6 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable {
throughputTracker.recordTaskCompletion(task.jobId);
}

/**
* Used in tests of the task redelivery mechanism. Report work as completed without actually doing anything,
* but fail to report results a certain percentage of the time.
*/
private void pretendToDoWork (RegionalTask task) {
try {
// Pretend the task takes 1-2 seconds to complete.
Thread.sleep(random.nextInt(1000) + 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) >= TESTING_FAILURE_RATE_PERCENT) {
OneOriginResult emptyContainer = new OneOriginResult(null, new AccessibilityResult(), null);
synchronized (workResults) {
workResults.add(new RegionalWorkResult(emptyContainer, task));
}
} else {
LOG.info("Intentionally failing to complete task {} for testing purposes.", task.taskId);
}
}

/**
* This is a model from which we can serialize the block of JSON metadata at the end of a
* binary grid of travel times, which we return from the worker to the UI via the backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public abstract class AnalysisWorkerTask extends ProfileRequest {
*/
public transient PointSet[] destinationPointSets;

/**
* If this field is non-null, it will intentionally cause failures on workers handling the task. This is done on
* testing or even production systems in order to observe and improve their robustness to failure.
*/
public ChaosParameters injectFault;

/**
* Is this a single point or regional request? Needed to encode types in JSON serialization. Can that type field be
* added automatically with a serializer annotation instead of by defining a getter method and two dummy methods?
Expand Down
Loading

0 comments on commit 83abf27

Please sign in to comment.