Skip to content

Commit a96f49a

Browse files
authored
Merge pull request #937 from conveyal/patches-pre-7.2
Pre-release patches
2 parents 953757c + b016f1e commit a96f49a

File tree

8 files changed

+110
-39
lines changed

8 files changed

+110
-39
lines changed

src/main/java/com/conveyal/analysis/components/BackendComponents.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
8686
new GtfsController(gtfsCache),
8787
new BundleController(this),
8888
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
89-
new RegionalAnalysisController(broker, fileStorage),
89+
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
9090
new AggregationAreaController(fileStorage, database, taskScheduler),
9191
// This broker controller registers at least one handler at URL paths beginning with /internal, which
9292
// is exempted from authentication and authorization, but should be hidden from the world

src/main/java/com/conveyal/analysis/components/broker/Broker.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,18 @@ public interface Config {
110110
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
111111
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
112112
*
113-
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
113+
* The value should eventually be tuned. The value of 16 is the value used by the previous sporadic
114114
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
115+
*
116+
* NOTE that as a side effect this limits the total throughput of each worker to:
117+
* MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second.
118+
* It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water,
119+
* deserts etc.) In this case the system may need to burn through millions of origins, only checking that they
120+
* aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis.
121+
* It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for
122+
* disconnected origins or at least concisely signal large blocks of them in worker responses.
115123
*/
116-
public static final int MAX_TASKS_PER_WORKER = 16;
124+
public static final int MAX_TASKS_PER_WORKER = 40;
117125

118126
/**
119127
* Used when auto-starting spot instances. Set to a smaller value to increase the number of

src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java

+70-33
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.conveyal.analysis.AnalysisServerException;
44
import com.conveyal.analysis.SelectingGridReducer;
55
import com.conveyal.analysis.UserPermissions;
6+
import com.conveyal.analysis.components.TaskScheduler;
67
import com.conveyal.analysis.components.broker.Broker;
78
import com.conveyal.analysis.components.broker.JobStatus;
89
import com.conveyal.analysis.models.AnalysisRequest;
@@ -11,6 +12,7 @@
1112
import com.conveyal.analysis.models.RegionalAnalysis;
1213
import com.conveyal.analysis.persistence.Persistence;
1314
import com.conveyal.analysis.results.CsvResultType;
15+
import com.conveyal.analysis.util.HttpStatus;
1416
import com.conveyal.analysis.util.JsonUtil;
1517
import com.conveyal.file.FileStorage;
1618
import com.conveyal.file.FileStorageFormat;
@@ -22,6 +24,7 @@
2224
import com.conveyal.r5.analyst.PointSet;
2325
import com.conveyal.r5.analyst.PointSetCache;
2426
import com.conveyal.r5.analyst.cluster.RegionalTask;
27+
import com.conveyal.r5.analyst.progress.Task;
2528
import com.google.common.primitives.Ints;
2629
import com.mongodb.QueryBuilder;
2730
import gnu.trove.list.array.TIntArrayList;
@@ -36,6 +39,7 @@
3639
import java.io.FileOutputStream;
3740
import java.io.IOException;
3841
import java.io.InputStream;
42+
import java.io.OutputStream;
3943
import java.net.URI;
4044
import java.nio.file.FileSystem;
4145
import java.nio.file.FileSystems;
@@ -45,9 +49,12 @@
4549
import java.util.ArrayList;
4650
import java.util.Arrays;
4751
import java.util.Collection;
52+
import java.util.Collections;
53+
import java.util.HashSet;
4854
import java.util.List;
4955
import java.util.Locale;
5056
import java.util.Map;
57+
import java.util.Set;
5158
import java.util.zip.GZIPOutputStream;
5259

5360
import static com.conveyal.analysis.util.JsonUtil.toJson;
@@ -60,6 +67,7 @@
6067
import static com.google.common.base.Preconditions.checkState;
6168
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;
6269
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML;
70+
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN;
6371

6472
/**
6573
* Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching
@@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController {
8088

8189
private final Broker broker;
8290
private final FileStorage fileStorage;
91+
private final TaskScheduler taskScheduler;
8392

84-
public RegionalAnalysisController (Broker broker, FileStorage fileStorage) {
93+
public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) {
8594
this.broker = broker;
8695
this.fileStorage = fileStorage;
96+
this.taskScheduler = taskScheduler;
8797
}
8898

8999
private Collection<RegionalAnalysis> getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) {
@@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid (
254264
grid.writeGeotiff(fos);
255265
break;
256266
}
257-
267+
LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey);
258268
fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile);
269+
LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey);
259270
}
260271
String analysisHumanName = humanNameForEntity(analysis);
261272
String destinationHumanName = humanNameForEntity(destinations);
@@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid (
266277
return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename);
267278
}
268279

280+
// Prevent multiple requests from creating the same files in parallel.
281+
// This could potentially be integrated into FileStorage with enum return values or an additional boolean method.
282+
private Set<String> filesBeingPrepared = Collections.synchronizedSet(new HashSet<>());
283+
269284
private Object getAllRegionalResults (Request req, Response res) throws IOException {
270285
final String regionalAnalysisId = req.params("_id");
271286
final UserPermissions userPermissions = UserPermissions.from(req);
@@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept
277292
throw AnalysisServerException.badRequest("Batch result download only available for gridded origins.");
278293
}
279294
FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip");
280-
if (!fileStorage.exists(zippedResultsKey)) {
281-
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one.
282-
List<HumanKey> humanKeys = new ArrayList<>();
283-
for (String destinationPointSetId : analysis.destinationPointSetIds) {
284-
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
285-
for (int cutoffMinutes : analysis.cutoffsMinutes) {
286-
for (int percentile : analysis.travelTimePercentiles) {
287-
HumanKey gridKey = getSingleCutoffGrid(
288-
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
289-
);
290-
humanKeys.add(gridKey);
295+
if (fileStorage.exists(zippedResultsKey)) {
296+
res.type(APPLICATION_JSON.asString());
297+
String analysisHumanName = humanNameForEntity(analysis);
298+
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
299+
}
300+
if (filesBeingPrepared.contains(zippedResultsKey.path)) {
301+
res.type(TEXT_PLAIN.asString());
302+
res.status(HttpStatus.ACCEPTED_202);
303+
return "Geotiff zip is already being prepared in the background.";
304+
}
305+
// File did not exist. Create it in the background and ask caller to request it later.
306+
filesBeingPrepared.add(zippedResultsKey.path);
307+
Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name)
308+
.forUser(userPermissions)
309+
.withAction(progressListener -> {
310+
int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length *
311+
analysis.travelTimePercentiles.length * 2 + 1;
312+
progressListener.beginTask("Creating and archiving geotiffs...", nSteps);
313+
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination.
314+
List<HumanKey> humanKeys = new ArrayList<>();
315+
for (String destinationPointSetId : analysis.destinationPointSetIds) {
316+
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
317+
for (int cutoffMinutes : analysis.cutoffsMinutes) {
318+
for (int percentile : analysis.travelTimePercentiles) {
319+
HumanKey gridKey = getSingleCutoffGrid(
320+
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
321+
);
322+
humanKeys.add(gridKey);
323+
progressListener.increment();
324+
}
291325
}
292326
}
293-
}
294-
File tempZipFile = File.createTempFile("regional", ".zip");
295-
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
296-
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries.
297-
// May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
298-
tempZipFile.delete();
299-
Map<String, String> env = Map.of("create", "true");
300-
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
301-
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
302-
for (HumanKey key : humanKeys) {
303-
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
304-
Path zipPath = zipFilesystem.getPath(key.humanName);
305-
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
327+
File tempZipFile = File.createTempFile("regional", ".zip");
328+
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
329+
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual
330+
// entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
331+
tempZipFile.delete();
332+
Map<String, String> env = Map.of("create", "true");
333+
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
334+
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
335+
for (HumanKey key : humanKeys) {
336+
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
337+
Path zipPath = zipFilesystem.getPath(key.humanName);
338+
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
339+
progressListener.increment();
340+
}
306341
}
307-
}
308-
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
309-
}
310-
res.type(APPLICATION_JSON.asString());
311-
String analysisHumanName = humanNameForEntity(analysis);
312-
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
342+
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
343+
progressListener.increment();
344+
filesBeingPrepared.remove(zippedResultsKey.path);
345+
});
346+
taskScheduler.enqueue(task);
347+
res.type(TEXT_PLAIN.asString());
348+
res.status(HttpStatus.ACCEPTED_202);
349+
return "Building geotiff zip in background.";
313350
}
314351

315352
/**
@@ -666,7 +703,7 @@ public void registerEndpoints (spark.Service sparkService) {
666703
sparkService.get("/:_id", this::getRegionalAnalysis);
667704
sparkService.get("/:_id/all", this::getAllRegionalResults, toJson);
668705
sparkService.get("/:_id/grid/:format", this::getRegionalResults, toJson);
669-
sparkService.get("/:_id/csv/:resultType", this::getCsvResults, toJson);
706+
sparkService.get("/:_id/csv/:resultType", this::getCsvResults);
670707
sparkService.get("/:_id/scenarioJsonUrl", this::getScenarioJsonUrl, toJson);
671708
sparkService.delete("/:_id", this::deleteRegionalAnalysis, toJson);
672709
sparkService.post("", this::createRegionalAnalysis, toJson);

src/main/java/com/conveyal/analysis/results/CsvResultWriter.java

+2
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception
100100
// CsvWriter is not threadsafe and multiple threads may call this, so after values are generated,
101101
// the actual writing is synchronized (TODO confirm)
102102
// Is result row generation slow enough to bother synchronizing only the following block?
103+
// This first dimension check is specific to each subclass. The check in the loop below is more general,
104+
// applying to all subclasses (after the subclass-specific rowValues method may have added some columns).
103105
checkDimension(workResult);
104106
Iterable<String[]> rows = rowValues(workResult);
105107
synchronized (this) {

src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java

+20
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ public Iterable<String[]> rowValues (RegionalWorkResult workResult) {
4242
return rows;
4343
}
4444

45+
// Around 2024-04 we wanted to expand the number of CSV columns and needed to update the dimension checks below.
46+
// The number of columns is checked twice, once in this specific CsvResultWriter implementation and once in the
47+
// abstract superclass.
48+
// We don't want to introduce a column count check with tolerance that is applied separately to each row, because
49+
// this will not catch a whole class of problems where the worker instances are not producing a consistent number
50+
// of columns across origins.
51+
// We do ideally want to allow experimental workers that add an unknown number of columns, but they should add those
52+
// columns to every row. This requires some kind of negotiated, flexible protocol between the backend and workers.
53+
// Or some system where the first worker response received sets expectations and all other responses must match.
54+
// We thought this through and decided it was too big a change to introduce immediately.
55+
// So we only accept one specific quantity of CSV columns, but fail with a very specific message when we see a
56+
// number of CSV columns that we recognize as coming from an obsolete worker version. Breaking backward
57+
// compatibility is acceptable here because CSV paths are still considered an experimental feature.
58+
// Ideally this very case-specific check and error message will be removed when some more general system is added.
59+
4560
@Override
4661
protected void checkDimension (RegionalWorkResult workResult) {
4762
// Path CSV output only supports a single freeform pointset for now.
@@ -53,6 +68,11 @@ protected void checkDimension (RegionalWorkResult workResult) {
5368
for (ArrayList<String[]> oneDestination : workResult.pathResult) {
5469
// Number of distinct paths per destination is variable, don't validate it.
5570
for (String[] iterationDetails : oneDestination) {
71+
if (iterationDetails.length == 10) {
72+
throw new IllegalArgumentException(
73+
"Please use worker version newer than v7.1. CSV columns in path results have changed."
74+
);
75+
}
5676
checkDimension(workResult, "columns", iterationDetails.length, PathResult.DATA_COLUMNS.length);
5777
}
5878
}

src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public Type getType() {
7474
*/
7575
@Override
7676
public WebMercatorExtents getWebMercatorExtents() {
77-
if (makeTauiSite) {
77+
if (makeTauiSite || this.hasFlag("CROP_DESTINATIONS")) {
7878
return WebMercatorExtents.forTask(this);
7979
} else {
8080
return WebMercatorExtents.forPointsets(this.destinationPointSets);
@@ -112,4 +112,8 @@ public int nTargetsPerOrigin () {
112112
}
113113
}
114114

115+
public boolean hasFlag (String flag) {
116+
return this.flags != null && this.flags.contains(flag);
117+
}
118+
115119
}

src/main/java/com/conveyal/r5/analyst/progress/Task.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ protected void bubbleUpProgress() {
162162
}
163163

164164
/**
165-
* Check that all necesary fields have been set before enqueueing for execution, and check any invariants.
165+
* Check that all necessary fields have been set before enqueueing for execution, and check any invariants.
166166
*/
167167
public void validate () {
168168
if (this.user == null) {

src/main/java/com/conveyal/r5/transit/path/RouteSequence.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public String[] detailsWithGtfsIds (TransitLayer transitLayer, CsvResultOptions
6262
routeJoiner.toString(),
6363
boardStopJoiner.toString(),
6464
alightStopJoiner.toString(),
65+
feedJoiner.toString(),
6566
rideTimeJoiner.toString(),
66-
feedJoiner.toString(),
6767
accessTime,
6868
egressTime
6969
};

0 commit comments

Comments
 (0)