From 704c8ce9fbe0222591c438d9927f3fbcae4a9011 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Sat, 20 Jul 2024 18:15:36 +0800 Subject: [PATCH] Unified size-limited scenario network cache Moved from individual TransportNetwork instances to a single cache under TransportNetworkCache. --- .../r5/analyst/cluster/ScenarioCache.java | 2 +- .../r5/analyst/cluster/WorkerStatus.java | 2 - .../conveyal/r5/transit/TransportNetwork.java | 10 +- .../r5/transit/TransportNetworkCache.java | 113 ++++++++---------- 4 files changed, 57 insertions(+), 70 deletions(-) diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java b/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java index a8fa1a61e..d253e4b6b 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java @@ -44,7 +44,7 @@ public class ScenarioCache { public synchronized void storeScenario (Scenario scenario) { Scenario existingScenario = scenariosById.put(scenario.id, scenario); if (existingScenario != null) { - LOG.debug("Scenario cache already contained a this scenario."); + LOG.debug("Scenario cache already contained this scenario."); } } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 1fcd17a6e..9b6d54632 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -39,7 +39,6 @@ public class WorkerStatus { public String workerVersion; public String workerId; public Set networks = new HashSet<>(); - public Set scenarios = new HashSet<>(); public double secondsSinceLastPoll; public Map tasksPerMinuteByJobId; @JsonUnwrapped(prefix = "ec2") @@ -86,7 +85,6 @@ public WorkerStatus (AnalysisWorker worker) { // networks = worker.networkPreloader.transportNetworkCache.getLoadedNetworkIds(); // For now we report a single network, even before it's loaded. networks = Sets.newHashSet(worker.networkId); - scenarios = worker.networkPreloader.transportNetworkCache.getAppliedScenarios(); ec2 = worker.ec2info; OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); diff --git a/src/main/java/com/conveyal/r5/transit/TransportNetwork.java b/src/main/java/com/conveyal/r5/transit/TransportNetwork.java index 3e3cb3720..bbf4f8f70 100644 --- a/src/main/java/com/conveyal/r5/transit/TransportNetwork.java +++ b/src/main/java/com/conveyal/r5/transit/TransportNetwork.java @@ -53,14 +53,10 @@ public class TransportNetwork implements Serializable { public TransitLayer transitLayer; /** - * This stores any number of lightweight scenario networks built upon the current base network. - * FIXME that sounds like a memory leak, should be a WeighingCache or at least size-limited. - * A single network cache at the top level could store base networks and scenarios since they all have globally - * unique IDs. A hierarchical cache does have the advantage of evicting all the scenarios with the associated - * base network, which keeps the references in the scenarios from holding on to the base network. But considering - * that we have never started evicting networks (other than for a "cache" of one element) this might be getting - * ahead of ourselves. + * This field is no longer used. It has been moved to TransportNetworkCache, but this one remains for now, to + * avoid any inadvertent incompatibilities with serialized network files or serialization library settings. */ + @Deprecated public transient Map scenarios = new HashMap<>(); /** diff --git a/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java b/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java index 8a5a4160d..8b79e7417 100644 --- a/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java +++ b/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java @@ -29,11 +29,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -53,10 +50,19 @@ public class TransportNetworkCache implements Component { private static final Logger LOG = LoggerFactory.getLogger(TransportNetworkCache.class); /** Cache size is currently limited to one, i.e. the worker holds on to only one network at a time. */ - private static final int DEFAULT_CACHE_SIZE = 1; + private static final int MAX_CACHED_NETWORKS = 1; + + /** + * It might seem sufficient to hold only two scenarios (for single point scenario comparison). But in certain cases + * (e.g. the regional task queue is bigger than the size of each queued regional job) we might end up working on + * a mix of tasks from N different scenarios. Note also that scenarios hold references to their base networks, so + * caching multiple scenario networks can theoretically keep just as many TransportNetworks in memory. + * But in practice, all cloud workers currently remain on a single for their entire lifespan. + */ + public static final int MAX_CACHED_SCENARIO_NETWORKS = 10; // TODO change all other caches from Guava to Caffeine caches. This one is already a Caffeine cache. - private final LoadingCache cache; + private final LoadingCache networkCache; private final FileStorage fileStorage; private final GTFSCache gtfsCache; @@ -64,15 +70,39 @@ public class TransportNetworkCache implements Component { /** * A table of already seen scenarios, avoiding downloading them repeatedly from S3 and allowing us to replace - * scenarios with only their IDs, and reverse that replacement later. + * scenarios with only their IDs, and reverse that replacement later. Note that this caches the Scenario objects + * themselves, not the TransportNetworks built from those Scenarios. */ private final ScenarioCache scenarioCache = new ScenarioCache(); + /** + * This record type is used for the private, encapsulated cache of TransportNetworks for different scenarios. + * Scenario IDs are unique so we could look up these networks by scenario ID alone. However the cache values need + * to be derived entirely from the cache keys. We need some way to look up the base network so we include its ID. + */ + private record BaseAndScenarioId (String baseNetworkId, String scenarioId) { } + + /** + * This stores a number of lightweight scenario networks built upon the current base network. + * Each scenario TransportNetwork has its own LinkageCache, containing LinkedPointSets that each have their own + * EgressCostTable. In practice this can exhaust memory, e.g. after using bicycle egress for about 50 scenarios. + * The previous hierarchical arrangement of caches has the advantage of evicting all the scenarios with the + * associated base network, which keeps the references in the scenarios from holding on to the base network. + * But considering that we have never started evicting networks (other than for a "cache" of one element) this + * eviction can be handled in other ways. + */ + private LoadingCache scenarioNetworkCache; + /** Create a transport network cache. If source bucket is null, will work offline. */ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMCache osmCache) { this.osmCache = osmCache; this.gtfsCache = gtfsCache; - this.cache = createCache(DEFAULT_CACHE_SIZE); + this.networkCache = Caffeine.newBuilder() + .maximumSize(MAX_CACHED_NETWORKS) + .build(this::loadNetwork); + this.scenarioNetworkCache = Caffeine.newBuilder() + .maximumSize(MAX_CACHED_SCENARIO_NETWORKS) + .build(this::loadScenario); this.fileStorage = fileStorage; } @@ -83,7 +113,7 @@ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMC public synchronized @Nonnull TransportNetwork getNetwork (String networkId) throws TransportNetworkException { try { - return cache.get(networkId); + return networkCache.get(networkId); } catch (Exception e) { throw new TransportNetworkException("Could not load TransportNetwork into cache. ", e); } @@ -119,31 +149,22 @@ public void rememberScenario (Scenario scenario) { * tables is already parallelized. */ public synchronized TransportNetwork getNetworkForScenario (String networkId, String scenarioId) { - // If the networkId is different than previous calls, a new network will be loaded. Its transient nested map - // of scenarios will be empty at first. This ensures it's initialized if null. - // FIXME apparently this can't happen - the field is transient and initialized in TransportNetwork. - TransportNetwork baseNetwork = this.getNetwork(networkId); - if (baseNetwork.scenarios == null) { - baseNetwork.scenarios = new HashMap<>(); - } + TransportNetwork scenarioNetwork = scenarioNetworkCache.get(new BaseAndScenarioId(networkId, scenarioId)); + return scenarioNetwork; + } - TransportNetwork scenarioNetwork = baseNetwork.scenarios.get(scenarioId); - if (scenarioNetwork == null) { - // The network for this scenario was not found in the cache. Create that scenario network and cache it. - LOG.debug("Applying scenario to base network..."); - // Fetch the full scenario if an ID was specified. - Scenario scenario = resolveScenario(networkId, scenarioId); - // Apply any scenario modifications to the network before use, performing protective copies where necessary. - // We used to prepend a filter to the scenario, removing trips that are not running during the search time window. - // However, because we are caching transportNetworks with scenarios already applied to them, we can’t use - // the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always - // apply scenarios every time. - scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork); - LOG.debug("Done applying scenario. Caching the resulting network."); - baseNetwork.scenarios.put(scenario.id, scenarioNetwork); - } else { - LOG.debug("Reusing cached TransportNetwork for scenario {}.", scenarioId); - } + private TransportNetwork loadScenario (BaseAndScenarioId ids) { + TransportNetwork baseNetwork = this.getNetwork(ids.baseNetworkId()); + LOG.debug("Scenario TransportNetwork not found. Applying scenario to base network and caching it."); + // Fetch the full scenario if an ID was specified. + Scenario scenario = resolveScenario(ids.baseNetworkId(), ids.scenarioId()); + // Apply any scenario modifications to the network before use, performing protective copies where necessary. + // We used to prepend a filter to the scenario, removing trips that are not running during the search time window. + // However, because we are caching transportNetworks with scenarios already applied to them, we can’t use + // the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always + // apply scenarios every time. + TransportNetwork scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork); + LOG.debug("Done applying scenario. Caching the resulting network."); return scenarioNetwork; } @@ -356,12 +377,6 @@ private String getNetworkConfigFilename (String networkId) { return GTFSCache.cleanId(networkId) + ".json"; } - private LoadingCache createCache(int size) { - return Caffeine.newBuilder() - .maximumSize(size) - .build(this::loadNetwork); - } - /** * CacheLoader method, which should only be called by the LoadingCache. * Return the graph for the given unique identifier. Load pre-built serialized networks from local or remote @@ -393,28 +408,6 @@ private LoadingCache createCache(int size) { } } - /** - * This will eventually be used in WorkerStatus to report to the backend all loaded networks, to give it hints about - * what kind of tasks the worker is ready to work on immediately. This is made more complicated by the fact that - * workers are started up with no networks loaded, but with the intent for them to work on a particular job. So - * currently the workers just report which network they were started up for, and this method is not used. - * - * In the future, workers should just report an empty set of loaded networks, and the back end should strategically - * send them tasks when they come on line to assign them to networks as needed. But this will require a new - * mechanism to fairly allocate the workers to jobs. - */ - public Set getLoadedNetworkIds() { - return cache.asMap().keySet(); - } - - public Set getAppliedScenarios() { - return cache.asMap().values().stream() - .filter(network -> network.scenarios != null) - .map(network -> network.scenarios.keySet()) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - /** * Given a network and scenario ID, retrieve that scenario from the local disk cache (falling back on S3). */