Skip to content

Commit

Permalink
Unified size-limited scenario network cache
Browse files Browse the repository at this point in the history
Moved from individual TransportNetwork instances to a single cache
under TransportNetworkCache.
  • Loading branch information
abyrd committed Jul 20, 2024
1 parent eac1272 commit 704c8ce
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ScenarioCache {
public synchronized void storeScenario (Scenario scenario) {
Scenario existingScenario = scenariosById.put(scenario.id, scenario);
if (existingScenario != null) {
LOG.debug("Scenario cache already contained a this scenario.");
LOG.debug("Scenario cache already contained this scenario.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class WorkerStatus {
public String workerVersion;
public String workerId;
public Set<String> networks = new HashSet<>();
public Set<String> scenarios = new HashSet<>();
public double secondsSinceLastPoll;
public Map<String, Integer> tasksPerMinuteByJobId;
@JsonUnwrapped(prefix = "ec2")
Expand Down Expand Up @@ -86,7 +85,6 @@ public WorkerStatus (AnalysisWorker worker) {
// networks = worker.networkPreloader.transportNetworkCache.getLoadedNetworkIds();
// For now we report a single network, even before it's loaded.
networks = Sets.newHashSet(worker.networkId);
scenarios = worker.networkPreloader.transportNetworkCache.getAppliedScenarios();
ec2 = worker.ec2info;

OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/com/conveyal/r5/transit/TransportNetwork.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,10 @@ public class TransportNetwork implements Serializable {
public TransitLayer transitLayer;

/**
* This stores any number of lightweight scenario networks built upon the current base network.
* FIXME that sounds like a memory leak, should be a WeighingCache or at least size-limited.
* A single network cache at the top level could store base networks and scenarios since they all have globally
* unique IDs. A hierarchical cache does have the advantage of evicting all the scenarios with the associated
* base network, which keeps the references in the scenarios from holding on to the base network. But considering
* that we have never started evicting networks (other than for a "cache" of one element) this might be getting
* ahead of ourselves.
* This field is no longer used. It has been moved to TransportNetworkCache, but this one remains for now, to
* avoid any inadvertent incompatibilities with serialized network files or serialization library settings.
*/
@Deprecated
public transient Map<String, TransportNetwork> scenarios = new HashMap<>();

/**
Expand Down
113 changes: 53 additions & 60 deletions src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

Expand All @@ -53,26 +50,59 @@ public class TransportNetworkCache implements Component {
private static final Logger LOG = LoggerFactory.getLogger(TransportNetworkCache.class);

/** Cache size is currently limited to one, i.e. the worker holds on to only one network at a time. */
private static final int DEFAULT_CACHE_SIZE = 1;
private static final int MAX_CACHED_NETWORKS = 1;

/**
* It might seem sufficient to hold only two scenarios (for single point scenario comparison). But in certain cases
* (e.g. the regional task queue is bigger than the size of each queued regional job) we might end up working on
* a mix of tasks from N different scenarios. Note also that scenarios hold references to their base networks, so
* caching multiple scenario networks can theoretically keep just as many TransportNetworks in memory.
* But in practice, all cloud workers currently remain on a single for their entire lifespan.
*/
public static final int MAX_CACHED_SCENARIO_NETWORKS = 10;

// TODO change all other caches from Guava to Caffeine caches. This one is already a Caffeine cache.
private final LoadingCache<String, TransportNetwork> cache;
private final LoadingCache<String, TransportNetwork> networkCache;

private final FileStorage fileStorage;
private final GTFSCache gtfsCache;
private final OSMCache osmCache;

/**
* A table of already seen scenarios, avoiding downloading them repeatedly from S3 and allowing us to replace
* scenarios with only their IDs, and reverse that replacement later.
* scenarios with only their IDs, and reverse that replacement later. Note that this caches the Scenario objects
* themselves, not the TransportNetworks built from those Scenarios.
*/
private final ScenarioCache scenarioCache = new ScenarioCache();

/**
* This record type is used for the private, encapsulated cache of TransportNetworks for different scenarios.
* Scenario IDs are unique so we could look up these networks by scenario ID alone. However the cache values need
* to be derived entirely from the cache keys. We need some way to look up the base network so we include its ID.
*/
private record BaseAndScenarioId (String baseNetworkId, String scenarioId) { }

/**
* This stores a number of lightweight scenario networks built upon the current base network.
* Each scenario TransportNetwork has its own LinkageCache, containing LinkedPointSets that each have their own
* EgressCostTable. In practice this can exhaust memory, e.g. after using bicycle egress for about 50 scenarios.
* The previous hierarchical arrangement of caches has the advantage of evicting all the scenarios with the
* associated base network, which keeps the references in the scenarios from holding on to the base network.
* But considering that we have never started evicting networks (other than for a "cache" of one element) this
* eviction can be handled in other ways.
*/
private LoadingCache<BaseAndScenarioId, TransportNetwork> scenarioNetworkCache;

/** Create a transport network cache. If source bucket is null, will work offline. */
public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMCache osmCache) {
this.osmCache = osmCache;
this.gtfsCache = gtfsCache;
this.cache = createCache(DEFAULT_CACHE_SIZE);
this.networkCache = Caffeine.newBuilder()
.maximumSize(MAX_CACHED_NETWORKS)
.build(this::loadNetwork);
this.scenarioNetworkCache = Caffeine.newBuilder()
.maximumSize(MAX_CACHED_SCENARIO_NETWORKS)
.build(this::loadScenario);
this.fileStorage = fileStorage;
}

Expand All @@ -83,7 +113,7 @@ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMC
public synchronized @Nonnull
TransportNetwork getNetwork (String networkId) throws TransportNetworkException {
try {
return cache.get(networkId);
return networkCache.get(networkId);
} catch (Exception e) {
throw new TransportNetworkException("Could not load TransportNetwork into cache. ", e);
}
Expand Down Expand Up @@ -119,31 +149,22 @@ public void rememberScenario (Scenario scenario) {
* tables is already parallelized.
*/
public synchronized TransportNetwork getNetworkForScenario (String networkId, String scenarioId) {
// If the networkId is different than previous calls, a new network will be loaded. Its transient nested map
// of scenarios will be empty at first. This ensures it's initialized if null.
// FIXME apparently this can't happen - the field is transient and initialized in TransportNetwork.
TransportNetwork baseNetwork = this.getNetwork(networkId);
if (baseNetwork.scenarios == null) {
baseNetwork.scenarios = new HashMap<>();
}
TransportNetwork scenarioNetwork = scenarioNetworkCache.get(new BaseAndScenarioId(networkId, scenarioId));
return scenarioNetwork;
}

TransportNetwork scenarioNetwork = baseNetwork.scenarios.get(scenarioId);
if (scenarioNetwork == null) {
// The network for this scenario was not found in the cache. Create that scenario network and cache it.
LOG.debug("Applying scenario to base network...");
// Fetch the full scenario if an ID was specified.
Scenario scenario = resolveScenario(networkId, scenarioId);
// Apply any scenario modifications to the network before use, performing protective copies where necessary.
// We used to prepend a filter to the scenario, removing trips that are not running during the search time window.
// However, because we are caching transportNetworks with scenarios already applied to them, we can’t use
// the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always
// apply scenarios every time.
scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork);
LOG.debug("Done applying scenario. Caching the resulting network.");
baseNetwork.scenarios.put(scenario.id, scenarioNetwork);
} else {
LOG.debug("Reusing cached TransportNetwork for scenario {}.", scenarioId);
}
private TransportNetwork loadScenario (BaseAndScenarioId ids) {
TransportNetwork baseNetwork = this.getNetwork(ids.baseNetworkId());
LOG.debug("Scenario TransportNetwork not found. Applying scenario to base network and caching it.");
// Fetch the full scenario if an ID was specified.
Scenario scenario = resolveScenario(ids.baseNetworkId(), ids.scenarioId());
// Apply any scenario modifications to the network before use, performing protective copies where necessary.
// We used to prepend a filter to the scenario, removing trips that are not running during the search time window.
// However, because we are caching transportNetworks with scenarios already applied to them, we can’t use
// the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always
// apply scenarios every time.
TransportNetwork scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork);
LOG.debug("Done applying scenario. Caching the resulting network.");
return scenarioNetwork;
}

Expand Down Expand Up @@ -356,12 +377,6 @@ private String getNetworkConfigFilename (String networkId) {
return GTFSCache.cleanId(networkId) + ".json";
}

private LoadingCache createCache(int size) {
return Caffeine.newBuilder()
.maximumSize(size)
.build(this::loadNetwork);
}

/**
* CacheLoader method, which should only be called by the LoadingCache.
* Return the graph for the given unique identifier. Load pre-built serialized networks from local or remote
Expand Down Expand Up @@ -393,28 +408,6 @@ private LoadingCache createCache(int size) {
}
}

/**
* This will eventually be used in WorkerStatus to report to the backend all loaded networks, to give it hints about
* what kind of tasks the worker is ready to work on immediately. This is made more complicated by the fact that
* workers are started up with no networks loaded, but with the intent for them to work on a particular job. So
* currently the workers just report which network they were started up for, and this method is not used.
*
* In the future, workers should just report an empty set of loaded networks, and the back end should strategically
* send them tasks when they come on line to assign them to networks as needed. But this will require a new
* mechanism to fairly allocate the workers to jobs.
*/
public Set<String> getLoadedNetworkIds() {
return cache.asMap().keySet();
}

public Set<String> getAppliedScenarios() {
return cache.asMap().values().stream()
.filter(network -> network.scenarios != null)
.map(network -> network.scenarios.keySet())
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

/**
* Given a network and scenario ID, retrieve that scenario from the local disk cache (falling back on S3).
*/
Expand Down

0 comments on commit 704c8ce

Please sign in to comment.