From 4f33050e97d0d25d65c312eda16fe6b368653ba3 Mon Sep 17 00:00:00 2001 From: tolziplohu Date: Tue, 31 Aug 2021 15:03:20 -0500 Subject: [PATCH 1/2] refactor: do light merging later, remove ChunkProcessingPipeline --- .../pipeline/ChunkProcessingPipelineTest.java | 260 ----------------- .../LocalChunkProvider.java | 114 ++++---- .../chunks/pipeline/ChunkProcessingInfo.java | 72 ----- .../pipeline/ChunkProcessingPipeline.java | 271 ------------------ .../world/chunks/pipeline/PositionFuture.java | 57 ---- .../chunks/pipeline/stages/ChunkTask.java | 40 --- .../pipeline/stages/ChunkTaskProvider.java | 62 ---- .../stages/MultiplyRequirementChunkTask.java | 49 ---- .../pipeline/stages/SingleChunkTask.java | 46 --- .../RemoteChunkProvider.java | 94 +++--- .../world/propagation/light/LightMerger.java | 6 +- 11 files changed, 86 insertions(+), 985 deletions(-) delete mode 100644 engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTask.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTaskProvider.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/MultiplyRequirementChunkTask.java delete mode 100644 engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/SingleChunkTask.java diff --git a/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java b/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java deleted file mode 100644 index 433217956e7..00000000000 --- a/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.joml.Vector3i; -import org.joml.Vector3ic; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.terasology.engine.TerasologyTestingEnvironment; -import org.terasology.engine.registry.CoreRegistry; -import org.terasology.engine.world.block.BlockManager; -import org.terasology.engine.world.block.internal.BlockManagerImpl; -import org.terasology.engine.world.block.tiles.NullWorldAtlas; -import org.terasology.engine.world.chunks.Chunk; -import org.terasology.engine.world.chunks.blockdata.ExtraBlockDataManager; -import org.terasology.engine.world.chunks.internal.ChunkImpl; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; -import org.terasology.gestalt.assets.management.AssetManager; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.scheduler.Schedulers; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -@TestInstance(TestInstance.Lifecycle.PER_METHOD) -@Tag("TteTest") -class ChunkProcessingPipelineTest extends TerasologyTestingEnvironment { - - private final BlockManager blockManager = new BlockManagerImpl(new NullWorldAtlas(), - CoreRegistry.get(AssetManager.class)); - private final ExtraBlockDataManager extraDataManager = new ExtraBlockDataManager(); - private ChunkProcessingPipeline pipeline; - private FluxSink chunkSink; - - void initPipeline(Function getCached) { - // Use a custom scheduler which runs everything immediately, on the same thread - pipeline = new ChunkProcessingPipeline(getCached, Flux.push(sink -> chunkSink = sink), Schedulers.immediate()); - } - - @Test - void simpleProcessingSuccess() { - initPipeline(p -> null); - - Vector3i chunkPos = new Vector3i(0, 0, 0); - Chunk chunk = createChunkAt(chunkPos); - List result = new ArrayList<>(); - - pipeline.addStage(ChunkTaskProvider.create("dummy task", (c) -> c)); - pipeline.addStage(ChunkTaskProvider.create("Chunk ready", (Consumer) result::add)); - - chunkSink.next(chunk); - chunkSink.complete(); - pipeline.notifyUpdate(); - - Chunk chunkAfterProcessing = result.get(0); - - Assertions.assertEquals(chunkAfterProcessing.getPosition(), chunk.getPosition(), - "Chunk after processing must have the same position, the pipeline probably lost your chunk"); - } - - @Test - void simpleStopProcessingSuccess() { - initPipeline(p -> null); - - Vector3i position = new Vector3i(0, 0, 0); - Chunk chunk = createChunkAt(position); - - pipeline.addStage(ChunkTaskProvider.create("dummy long executing task", (c) -> { - try { - Thread.sleep(1_000); - } catch (InterruptedException e) { - } - return c; - })); - - chunkSink.next(chunk); - chunkSink.complete(); - pipeline.notifyUpdate(); - - pipeline.stopProcessingAt(position); - Assertions.assertFalse(pipeline.isPositionProcessing(position)); - } - - /** - * Imagine that we have task, which requires neighbors with same Z level. neighbors chunk already in chunk cache. - */ - @Test - void multiRequirementsChunksExistsSuccess() { - Vector3i positionToGenerate = new Vector3i(0, 0, 0); - Map chunkCache = - getNearChunkPositions(positionToGenerate) - .stream() - .filter((p) -> !p.equals(positionToGenerate)) //remove central chunk. - .map(this::createChunkAt) - .collect(Collectors.toMap( - (chunk) -> chunk.getPosition(new Vector3i()), - Function.identity() - )); - - initPipeline(chunkCache::get); - pipeline.addStage(ChunkTaskProvider.createMulti( - "flat merging task", - (chunks) -> chunks.stream() - .filter((c) -> c.getPosition().equals(positionToGenerate)) - .findFirst() // return central chunk. - .get(), - this::getNearChunkPositions)); - List result = new ArrayList<>(); - pipeline.addStage(ChunkTaskProvider.create("Chunk ready", (Consumer) result::add)); - - Chunk chunk = createChunkAt(positionToGenerate); - - chunkSink.next(chunk); - chunkSink.complete(); - pipeline.notifyUpdate(); - - Chunk chunkAfterProcessing = result.get(0); - - Assertions.assertEquals(chunkAfterProcessing.getPosition(), chunk.getPosition(), - "Chunk after processing must have the same position, the pipeline probably lost your chunk"); - } - - /** - * Imagine that we have task, which requires neighbors with same Z level. neighbor will generated. - */ - @Test - void multiRequirementsChunksWillGeneratedSuccess() throws InterruptedException { - Vector3i positionToGenerate = new Vector3i(0, 0, 0); - List chunkToGenerate = - getNearChunkPositions(positionToGenerate) - .stream() - .filter((p) -> !p.equals(positionToGenerate)) //remove central chunk. - .map(this::createChunkAt) - .collect(Collectors.toList()); - - initPipeline(p -> null); - pipeline.addStage(ChunkTaskProvider.createMulti( - "flat merging task", - (chunks) -> chunks.stream() - .filter((c) -> c.getPosition().equals(positionToGenerate)).findFirst() // return central chunk. - .get(), - this::getNearChunkPositions)); - List result = new ArrayList<>(); - pipeline.addStage(ChunkTaskProvider.create("Chunk ready", (Consumer) result::add)); - - Chunk chunk = createChunkAt(positionToGenerate); - - chunkSink.next(chunk); - - - Thread.sleep(1_000); // sleep 1 second. and check future. - Assertions.assertTrue(result.isEmpty(), "Chunk must be not generated, because the ChunkTask have doesn't have " + - "its neighbors in requirements"); - - chunkToGenerate.forEach(chunkSink::next); - chunkSink.complete(); - pipeline.notifyUpdate(); - - Chunk chunkAfterProcessing = result.get(0); - - Assertions.assertEquals(chunkAfterProcessing.getPosition(), chunk.getPosition(), - "Chunk after processing must have the same position, the pipeline probably lost your chunk"); - } - - @Test - void emulateEntityMoving() throws InterruptedException { - final AtomicReference position = new AtomicReference<>(); - Map chunkCache = Maps.newConcurrentMap(); - initPipeline(chunkCache::get); - pipeline.addStage(ChunkTaskProvider.createMulti( - "flat merging task", - (chunks) -> chunks.stream() - .sorted((o1, o2) -> { - Function pos = (c) -> c.getPosition(new Vector3i()); - return Comparator.comparing(pos.andThen(Vector3i::x)) - .thenComparing(pos.andThen(Vector3i::y)) - .thenComparing(pos.andThen(Vector3i::z)) - .compare(o1, o2); - }).toArray(Chunk[]::new)[5], - this::getNearChunkPositions)); - pipeline.addStage(ChunkTaskProvider.create("finish chunk", (c) -> { - c.markReady(); - chunkCache.put(c.getPosition(), c); - })); - - - Set relativeRegion = Collections.emptySet(); - for (int i = 0; i < 10; i++) { - position.set(new Vector3i(i, 0, 0)); - Set newRegion = getNearChunkPositions(position.get(), 10); - // load new chunks. - Sets.difference(newRegion, relativeRegion).forEach((pos) -> chunkSink.next(createChunkAt(pos))); - - Sets.difference(relativeRegion, newRegion).forEach(// remove old chunks - (pos) -> { - chunkCache.remove(pos); - if (pipeline.isPositionProcessing(pos)) { - pipeline.stopProcessingAt(new Vector3i(pos)); - } - } - ); - relativeRegion = newRegion; - - pipeline.notifyUpdate(); - - Assertions.assertTrue(Sets.difference(chunkCache.keySet(), relativeRegion).isEmpty(), "We must haven't " + - "chunks not related to relativeRegion"); - Assertions.assertTrue(Sets.difference(pipeline.getProcessingPositions(), relativeRegion).isEmpty(), - "We must haven't chunks in processing not related to relativeRegion"); - - Assertions.assertTrue(relativeRegion.containsAll(pipeline.getProcessingPositions()), - "No non-relative chunks should be processing"); - - Thread.sleep(new Random().nextInt(500)); //think time - } - } - - @BeforeEach - void cleanup() { - if (pipeline != null) { - pipeline.shutdown(); - } - } - - private Set getNearChunkPositions(Vector3ic p) { - return getNearChunkPositions(p, 1); - } - - private Set getNearChunkPositions(Vector3ic p, int distance) { - Set requirements = new HashSet<>(); - for (int x = -distance; x <= distance; x++) { - for (int y = -distance; y <= distance; y++) { - requirements.add(new Vector3i(p.x() + x, p.y() + y, p.z())); - } - } - return requirements; - } - - private ChunkImpl createChunkAt(Vector3ic chunkPos) { - return new ChunkImpl(chunkPos, blockManager, extraDataManager); - } - -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java index 86252a96746..9326e0031e8 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java @@ -14,6 +14,7 @@ import org.joml.Vector3ic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.terasology.engine.core.GameScheduler; import org.terasology.engine.entitySystem.entity.EntityManager; import org.terasology.engine.entitySystem.entity.EntityRef; import org.terasology.engine.entitySystem.entity.EntityStore; @@ -41,18 +42,16 @@ import org.terasology.engine.world.chunks.event.PurgeWorldEvent; import org.terasology.engine.world.chunks.internal.ChunkImpl; import org.terasology.engine.world.chunks.internal.ChunkRelevanceRegion; -import org.terasology.engine.world.chunks.pipeline.ChunkProcessingPipeline; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; import org.terasology.engine.world.generation.impl.EntityBufferImpl; import org.terasology.engine.world.generator.WorldGenerator; import org.terasology.engine.world.internal.ChunkViewCore; import org.terasology.engine.world.internal.ChunkViewCoreImpl; import org.terasology.engine.world.propagation.light.InternalLightProcessor; import org.terasology.engine.world.propagation.light.LightMerger; +import org.terasology.gestalt.entitysystem.component.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Scheduler; -import org.terasology.gestalt.entitysystem.component.Component; import java.util.ArrayList; import java.util.Collection; @@ -60,11 +59,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; /** @@ -103,7 +101,6 @@ public class LocalChunkProvider implements ChunkProvider { private final WorldGenerator generator; private final BlockManager blockManager; private final ExtraBlockDataManager extraDataManager; - private ChunkProcessingPipeline loadingPipeline; private TaskMaster unloadRequestTaskMaster; private EntityRef worldEntity = EntityRef.NULL; private BlockEntityRegistry registry; @@ -112,8 +109,9 @@ public class LocalChunkProvider implements ChunkProvider { private final List chunksInRange = new ArrayList<>(); private BlockRegion[] lastRegions; - private volatile boolean shouldComplete = false; private final Set currentlyProcessing = new HashSet<>(); + private final Set needsLightMerging = Sets.newHashSet(); + private FluxSink chunkSink; public LocalChunkProvider(StorageManager storageManager, EntityManager entityManager, WorldGenerator generator, BlockManager blockManager, ExtraBlockDataManager extraDataManager, @@ -150,14 +148,29 @@ public void setWorldEntity(EntityRef worldEntity) { this.worldEntity = worldEntity; } + private void tryLightMerging(Vector3ic chunkPos) { + Chunk[] chunks = StreamSupport.stream(new BlockRegion(chunkPos).expand(1, 1, 1).spliterator(), false) + .map(chunkCache::get) + .filter(Objects::nonNull) + .toArray(Chunk[]::new); + if (chunks.length == 27) { + new LightMerger().merge(chunks); + needsLightMerging.remove(chunkPos); + } + } private void processReadyChunk(final Chunk chunk) { Vector3ic chunkPos = chunk.getPosition(); if (chunkCache.get(chunkPos) != null) { + logger.warn("Duplicate chunk"); return; // TODO move it in pipeline; } chunkCache.put(new Vector3i(chunkPos), chunk); chunk.markReady(); + GameScheduler.scheduleParallel("light merging", + () -> StreamSupport.stream(new BlockRegion(chunkPos).expand(1, 1, 1).spliterator(), false) + .filter(needsLightMerging::contains) + .forEach(this::tryLightMerging)); //TODO, it is not clear if the activate/addedBlocks event logic is correct. //See https://github.com/MovingBlocks/Terasology/issues/3244 ChunkStore store = this.storageManager.loadChunkStore(chunkPos); @@ -246,9 +259,7 @@ private void deactivateBlocks() { private void checkForUnload() { PerformanceMonitor.startActivity("Unloading irrelevant chunks"); int unloaded = 0; - Iterator iterator = Iterators.concat( - Iterators.transform(chunkCache.keySet().iterator(), v -> new Vector3i(v.x(), v.y(), v.z())), - loadingPipeline.getProcessingPositions().iterator()); + Iterator iterator = Iterators.transform(chunkCache.keySet().iterator(), v -> new Vector3i(v.x(), v.y(), v.z())); while (iterator.hasNext()) { Vector3ic pos = iterator.next(); boolean keep = relevanceSystem.isChunkInRegions(pos); // TODO: move it to relevance system. @@ -267,11 +278,7 @@ private void checkForUnload() { } private boolean unloadChunkInternal(Vector3ic pos) { - if (loadingPipeline.isPositionProcessing(pos)) { - // Chunk hasn't been finished or changed, so just drop it. - loadingPipeline.stopProcessingAt(pos); - return false; - } + needsLightMerging.remove(pos); Chunk chunk = chunkCache.get(pos); if (chunk == null) { return false; @@ -337,13 +344,12 @@ public Collection getAllChunks() { @Override public void restart() { - loadingPipeline.restart(); unloadRequestTaskMaster.restart(); } @Override public void shutdown() { - loadingPipeline.shutdown(); + chunkSink.complete(); unloadRequestTaskMaster.shutdown(new ChunkUnloadRequest(), true); } @@ -380,7 +386,7 @@ public boolean reloadChunk(Vector3ic coords) { @Override public void purgeWorld() { ChunkMonitor.fireChunkProviderDisposed(this); - loadingPipeline.shutdown(); + chunkSink.complete(); unloadRequestTaskMaster.shutdown(new ChunkUnloadRequest(), true); getAllChunks().stream().filter(Chunk::isReady).forEach(chunk -> { worldEntity.send(new BeforeChunkUnload(chunk.getPosition())); @@ -409,7 +415,7 @@ private boolean isChunkReady(Chunk chunk) { } public void notifyRelevanceChanged() { - loadingPipeline.notifyUpdate(); + nextChunks(16); } private void updateList() { @@ -470,7 +476,7 @@ private synchronized List chunksToGenerate(int numChunks) { while (chunks.size() < numChunks && !chunksInRange.isEmpty()) { Vector3ic pos = chunksInRange.remove(chunksInRange.size() - 1); - if (currentlyProcessing.contains(pos) || loadingPipeline.isPositionProcessing(pos)) { + if (currentlyProcessing.contains(pos)) { continue; } @@ -481,64 +487,40 @@ private synchronized List chunksToGenerate(int numChunks) { return chunks; } - /** - * This method runs once per chunk processing thread to set up the request callback. - */ - private void onSubscribe(FluxSink sink) { - sink.onRequest(numChunks -> { - List positionsPending = chunksToGenerate((int) numChunks); - - // Generating the actual chunks can be done completely asynchronously - for (Vector3ic pos : positionsPending) { - currentlyProcessing.remove(pos); - // The first time the onRequest lambda is called, when it submits its last chunk, this call to next() won't return - // because Reactor puts the event loop logic inside the next() function and the pipeline keeps requesting more chunks. - // So removing the position from currentlyProcessing and anything else that needs to happen must come before this call. - sink.next(genChunk(pos)); - } - if (shouldComplete && chunksInRange.isEmpty()) { - sink.complete(); - } - }); - } - /** * Tells the ChunkProcessingPipeline that no more chunks are coming after what's currently queued. * Intended for use in tests. */ protected void markComplete() { - shouldComplete = true; - loadingPipeline.notifyUpdate(); + chunkSink.complete(); } public void setRelevanceSystem(RelevanceSystem relevanceSystem) { - setRelevanceSystem(relevanceSystem, null); + setRelevanceSystem(relevanceSystem, GameScheduler.parallel()); + } + + private void nextChunks(long numChunks) { + List positionsPending = chunksToGenerate((int) numChunks); + for (Vector3ic p : positionsPending) { + chunkSink.next(p); + } } // TODO: move loadingPipeline initialization into constructor. public void setRelevanceSystem(RelevanceSystem relevanceSystem, Scheduler scheduler) { - if (loadingPipeline != null) { - loadingPipeline.shutdown(); - } this.relevanceSystem = relevanceSystem; - if (scheduler != null) { - loadingPipeline = new ChunkProcessingPipeline(this::getChunk, Flux.create(this::onSubscribe), scheduler); - } else { - loadingPipeline = new ChunkProcessingPipeline(this::getChunk, Flux.create(this::onSubscribe)); - } - loadingPipeline.addStage( - ChunkTaskProvider.create("Chunk generate internal lightning", - (Consumer) InternalLightProcessor::generateInternalLighting)) - .addStage(ChunkTaskProvider.create("Chunk deflate", Chunk::deflate)) - .addStage(ChunkTaskProvider.createMulti("Light merging", - chunks -> { - Chunk[] localChunks = chunks.toArray(new Chunk[0]); - return new LightMerger().merge(localChunks); - }, - pos -> StreamSupport.stream(new BlockRegion(pos).expand(1, 1, 1).spliterator(), false) - .map(Vector3i::new) - .collect(Collectors.toCollection(Sets::newLinkedHashSet)) - )) - .addStage(ChunkTaskProvider.create("Chunk ready", readyChunks::add)); + Flux.create(sink -> { + chunkSink = sink; + sink.onRequest(this::nextChunks); + }) + .parallel(2, 16) + .runOn(scheduler, 2) + .map(this::genChunk) + .map(x -> { + InternalLightProcessor.generateInternalLighting(x); + x.deflate(); + return x; + }) + .subscribe(readyChunks::add); } } diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java deleted file mode 100644 index 73517cea322..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline; - -import org.joml.Vector3ic; -import org.terasology.engine.world.chunks.Chunk; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; - -import java.util.List; - -public final class ChunkProcessingInfo { - - private final Vector3ic position; - - private Chunk chunk; - private ChunkTaskProvider chunkTaskProvider; - - private org.terasology.engine.world.chunks.pipeline.stages.ChunkTask chunkTask; - - public ChunkProcessingInfo(Vector3ic position) { - this.position = position; - } - - public Vector3ic getPosition() { - return position; - } - - public Chunk getChunk() { - return chunk; - } - - public void setChunk(Chunk chunk) { - this.chunk = chunk; - } - - public ChunkTaskProvider getChunkTaskProvider() { - return chunkTaskProvider; - } - - public org.terasology.engine.world.chunks.pipeline.stages.ChunkTask getChunkTask() { - return chunkTask; - } - - boolean hasNextStage(List stages) { - if (chunkTaskProvider == null) { - return true; - } else { - return stages.indexOf(chunkTaskProvider) != stages.size() - 1; - } - } - - void nextStage(List stages) { - int nextStageIndex = - chunkTaskProvider == null - ? 0 - : stages.indexOf(chunkTaskProvider) + 1; - chunkTaskProvider = stages.get(nextStageIndex); - } - - ChunkTask makeChunkTask() { - if (chunkTask == null) { - chunkTask = chunkTaskProvider.createChunkTask(position); - } - return chunkTask; - } - - void resetTaskState() { - chunkTask = null; - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java deleted file mode 100644 index ae99088db9e..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java +++ /dev/null @@ -1,271 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import org.joml.Vector3ic; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.terasology.engine.monitoring.ThreadActivity; -import org.terasology.engine.monitoring.ThreadMonitor; -import org.terasology.engine.world.chunks.Chunk; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.function.Function; - -/** - * Manages execution of chunk processing. - *

- * {@link Chunk}s will processing on stages {@link ChunkProcessingPipeline#addStage} - */ -public class ChunkProcessingPipeline { - - private static final int NUM_TASK_THREADS = 2; - private static final int CHUNKS_AT_ONCE = 8; - private static final Logger logger = LoggerFactory.getLogger(ChunkProcessingPipeline.class); - - private final List stages = Lists.newArrayList(); - - private final Function chunkProvider; - private final Map chunkProcessingInfoMap = Maps.newConcurrentMap(); - private final List subs = new ArrayList<>(); - - private final Scheduler scheduler; - - private final Object completeSignal = new Object(); - private Queue processing = Queues.newConcurrentLinkedQueue(); - - /** - * Create ChunkProcessingPipeline. - */ - public ChunkProcessingPipeline(Function chunkProvider, Flux chunkStream) { - this(chunkProvider, chunkStream, Schedulers.newParallel("chunk processing", NUM_TASK_THREADS)); - } - - /** - * @param scheduler The scheduler to use for running chunk processing threads. - */ - public ChunkProcessingPipeline(Function chunkProvider, Flux chunkStream, Scheduler scheduler) { - this.chunkProvider = chunkProvider; - this.scheduler = scheduler; - Flux stream = chunkStream.subscribeOn(scheduler); - for (int i = 0; i < NUM_TASK_THREADS; i++) { - stream.subscribe(new BaseSubscriber() { - private final List buffer = new ArrayList<>(); - private Subscription sub; - - @Override - public void hookOnSubscribe(Subscription sub) { - this.sub = sub; - subs.add(sub); - } - - @Override - public void hookOnNext(Chunk chunk) { - buffer.add(chunk); - if (buffer.size() >= CHUNKS_AT_ONCE) { - processNewChunks(buffer); - buffer.clear(); - sub.request(CHUNKS_AT_ONCE); - } - } - - @Override - public void hookOnComplete() { - processNewChunks(buffer); - subs.remove(sub); - if (subs.isEmpty()) { - synchronized (completeSignal) { - completeSignal.notifyAll(); - } - } - } - }); - } - } - - /** - * Notify the pipeline that new chunks are available, so if any worker threads were suspended, they should be resumed. - */ - public void notifyUpdate() { - // We can't use a foreach because `request` could trigger `hookOnComplete`, which modifies `subs` - for (int i = subs.size() - 1; i >= 0; i--) { - subs.get(i).request(CHUNKS_AT_ONCE); - } - } - - private void processNewChunks(List chunks) { - for (Chunk chunk : chunks) { - Vector3ic position = chunk.getPosition(); - if (chunkProcessingInfoMap.containsKey(position)) { - logger.warn("Requested processing chunk that was already processing"); - } - - ChunkProcessingInfo chunkProcessingInfo = new ChunkProcessingInfo(position); - chunkProcessingInfo.setChunk(chunk); - chunkProcessingInfo.nextStage(stages); - chunkProcessingInfo.makeChunkTask(); - processing.add(chunkProcessingInfo); - chunkProcessingInfoMap.put(position, chunkProcessingInfo); - } - processingInfoReactor(); - } - - private void processingInfoReactor() { - List defer = Lists.newArrayList(); - ChunkProcessingInfo info; - while ((info = processing.poll()) != null) { - ChunkTask task = info.getChunkTask(); - if (task != null) { - boolean satisfied = true; - List providedChunks = new ArrayList<>(10); - for (Vector3ic pos : task.getRequirements()) { - Chunk chunk = getChunkBy(info.getChunkTaskProvider(), pos); - // If we don't have all the requirements generated yet, skip it - if (chunk == null) { - satisfied = false; - break; - } - providedChunks.add(chunk); - } - - if (satisfied) { - try { - try (ThreadActivity ignored = ThreadMonitor.startThreadActivity(task.getName())) { - info.setChunk(task.apply(providedChunks)); - } - info.resetTaskState(); - if (info.hasNextStage(stages)) { - info.nextStage(stages); - info.makeChunkTask(); - processing.add(info); - } else { - cleanup(info); - } - } catch (Exception e) { - String stageName = - info.getChunkTaskProvider() == null - ? "Generation or Loading" - : info.getChunkTaskProvider().getName(); - logger.error( - String.format("ChunkTask at position %s and stage [%s] catch error: ", - info.getPosition(), stageName), - e); - cleanup(info); - } - } else { - defer.add(info); - } - } else { - if (info.hasNextStage(stages)) { - info.nextStage(stages); - info.makeChunkTask(); - processing.add(info); - } else { - cleanup(info); - } - } - } - processing.addAll(defer); - } - - private Chunk getChunkBy(ChunkTaskProvider requiredStage, Vector3ic position) { - Chunk chunk = chunkProvider.apply(position); - if (chunk == null) { - ChunkProcessingInfo candidate = chunkProcessingInfoMap.get(position); - if (candidate == null) { - return null; - } - ChunkTaskProvider candidateCurrentStage = candidate.getChunkTaskProvider(); - if (stages.indexOf(candidateCurrentStage) >= stages.indexOf(requiredStage)) { - chunk = candidate.getChunk(); - } - } - return chunk; - } - - /** - * Add stage to pipeline. - * - * @param stage function for ChunkTask generating by Chunk. - * @return self for Fluent api. - */ - public ChunkProcessingPipeline addStage(ChunkTaskProvider stage) { - stages.add(stage); - return this; - } - - public void shutdown() { - for (Subscription s : subs) { - s.cancel(); - } - scheduler.dispose(); - chunkProcessingInfoMap.keySet().forEach(this::stopProcessingAt); - chunkProcessingInfoMap.clear(); - } - - public void restart() { - chunkProcessingInfoMap.keySet().forEach(this::stopProcessingAt); - chunkProcessingInfoMap.clear(); - } - - /** - * Stop processing chunk at position. - * - * @param pos position of chunk to stop processing. - */ - public void stopProcessingAt(Vector3ic pos) { - ChunkProcessingInfo removed = chunkProcessingInfoMap.remove(pos); - if (removed == null) { - return; - } - processing.remove(removed); - - Chunk chunk = removed.getChunk(); - if (chunk != null) { - chunk.dispose(); - } - } - - /** - * Cleanuping Chunk processing after done. - * - * @param chunkProcessingInfo chunk to cleanup - */ - private void cleanup(ChunkProcessingInfo chunkProcessingInfo) { - chunkProcessingInfoMap.remove(chunkProcessingInfo.getPosition(), chunkProcessingInfo); - } - - /** - * Check is position processing. - * - * @param pos position for check - * @return true if position processing, false otherwise - */ - public boolean isPositionProcessing(Vector3ic pos) { - return chunkProcessingInfoMap.containsKey(pos); - } - - /** - * Get processing positions. - * - * @return copy of processing positions - */ - public Set getProcessingPositions() { - return chunkProcessingInfoMap.keySet(); - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java deleted file mode 100644 index 5ce0e3452a4..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline; - -import org.joml.Vector3ic; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class PositionFuture implements RunnableFuture { - - private final RunnableFuture delegate; - private final Vector3ic position; - - public PositionFuture(RunnableFuture delegate, Vector3ic position) { - this.delegate = delegate; - this.position = position; - } - - public Vector3ic getPosition() { - return position; - } - - @Override - public void run() { - delegate.run(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return delegate.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return delegate.isCancelled(); - } - - @Override - public boolean isDone() { - return delegate.isDone(); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return delegate.get(); - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, - TimeoutException { - return delegate.get(timeout, unit); - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTask.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTask.java deleted file mode 100644 index ef7ac7c125f..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTask.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline.stages; - -import org.joml.Vector3ic; -import org.terasology.engine.world.chunks.Chunk; - -import java.util.Collection; -import java.util.Collections; -import java.util.Set; -import java.util.function.Function; - -/** - * Function-style chunk task. - */ -public interface ChunkTask extends Function, Chunk> { - /** - * Task name. used for ThreadMonitor. - * - * @return task name. - */ - String getName(); - - /** - * Chunk task position. used for ChunkTask Sorting. - * - * @return - */ - Vector3ic getPosition(); - - /** - * Requirement another chunk at positions for this chunk task. - * - * @return required positions for processing. - */ - default Set getRequirements() { - return Collections.singleton(getPosition()); - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTaskProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTaskProvider.java deleted file mode 100644 index 0cef02219ff..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/ChunkTaskProvider.java +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline.stages; - -import org.joml.Vector3ic; -import org.terasology.engine.world.chunks.pipeline.ChunkProcessingPipeline; -import org.terasology.engine.world.chunks.Chunk; - -import java.util.Collection; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.UnaryOperator; - -/** - * Provides ChunkTask for {@link ChunkProcessingPipeline} - *

- * Also - *

- * Provides factory methods for creating ChunkTaskProviders. - */ -public class ChunkTaskProvider { - private final String name; - private final Function taskCreator; - - public ChunkTaskProvider(String name, Function taskCreator) { - this.name = name; - this.taskCreator = taskCreator; - } - - public static ChunkTaskProvider create(String name, UnaryOperator processingFunction) { - return new ChunkTaskProvider( - name, - pos -> new SingleChunkTask(name, pos, processingFunction)); - } - - public static ChunkTaskProvider create(String name, Consumer processingFunction) { - return new ChunkTaskProvider( - name, - pos -> new SingleChunkTask(name, pos, (c) -> { - processingFunction.accept(c); - return c; - })); - } - - public static ChunkTaskProvider createMulti(String name, Function, Chunk> processing, - Function> requirementCalculator) { - return new ChunkTaskProvider( - name, - pos -> new MultiplyRequirementChunkTask(name, pos, processing, requirementCalculator.apply(pos)) - ); - } - - public String getName() { - return name; - } - - public ChunkTask createChunkTask(Vector3ic pos) { - return taskCreator.apply(pos); - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/MultiplyRequirementChunkTask.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/MultiplyRequirementChunkTask.java deleted file mode 100644 index 52ea92856a3..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/MultiplyRequirementChunkTask.java +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline.stages; - -import org.joml.Vector3ic; -import org.terasology.engine.world.chunks.Chunk; - -import java.util.Collection; -import java.util.Set; -import java.util.function.Function; - -/** - * Chunk task which require many chunks for processing. - */ -public class MultiplyRequirementChunkTask implements ChunkTask { - private final String name; - private final Vector3ic position; - private final Function, Chunk> function; - private final Set requirements; - - public MultiplyRequirementChunkTask(String name, Vector3ic position, Function, Chunk> function, - Set requirements) { - this.name = name; - this.position = position; - this.function = function; - this.requirements = requirements; - } - - @Override - public Set getRequirements() { - return requirements; - } - - @Override - public String getName() { - return name; - } - - @Override - public Vector3ic getPosition() { - return position; - } - - @Override - public Chunk apply(Collection chunks) { - return function.apply(chunks); - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/SingleChunkTask.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/SingleChunkTask.java deleted file mode 100644 index 6a92018d74e..00000000000 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/stages/SingleChunkTask.java +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2021 The Terasology Foundation -// SPDX-License-Identifier: Apache-2.0 - -package org.terasology.engine.world.chunks.pipeline.stages; - -import com.google.common.base.Preconditions; -import org.joml.Vector3ic; -import org.terasology.engine.world.chunks.Chunk; - -import java.util.Collection; -import java.util.Optional; -import java.util.function.UnaryOperator; - -/** - * ChunkTask which required One chunk for processing. - */ -public class SingleChunkTask implements ChunkTask { - private final String name; - private final Vector3ic position; - private final UnaryOperator function; - - public SingleChunkTask(String name, Vector3ic position, UnaryOperator function) { - this.name = name; - this.position = position; - this.function = function; - } - - - @Override - public String getName() { - return name; - } - - @Override - public Vector3ic getPosition() { - return position; - } - - @Override - public Chunk apply(Collection chunks) { - Preconditions.checkArgument(chunks.size() == 1, "SingleChunkTask must have only one chunk on input"); - Optional chunk = chunks.stream().findFirst(); - Preconditions.checkArgument(chunk.isPresent(), "SingleChunkTask must have chunk on input"); - return function.apply(chunk.get()); - } -} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java index 3d09ee8d044..f51058e5b6b 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java @@ -6,11 +6,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; -import org.joml.Vector3f; +import com.google.common.collect.Sets; import org.joml.Vector3i; import org.joml.Vector3ic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.terasology.engine.core.GameScheduler; import org.terasology.engine.entitySystem.entity.EntityRef; import org.terasology.engine.logic.players.LocalPlayer; import org.terasology.engine.monitoring.chunk.ChunkMonitor; @@ -19,25 +20,21 @@ import org.terasology.engine.world.block.BlockRegionc; import org.terasology.engine.world.chunks.Chunk; import org.terasology.engine.world.chunks.ChunkProvider; -import org.terasology.engine.world.chunks.Chunks; import org.terasology.engine.world.chunks.event.BeforeChunkUnload; import org.terasology.engine.world.chunks.event.OnChunkLoaded; -import org.terasology.engine.world.chunks.pipeline.ChunkProcessingPipeline; -import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; import org.terasology.engine.world.internal.ChunkViewCore; import org.terasology.engine.world.internal.ChunkViewCoreImpl; import org.terasology.engine.world.propagation.light.InternalLightProcessor; import org.terasology.engine.world.propagation.light.LightMerger; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; /** @@ -58,39 +55,20 @@ public class RemoteChunkProvider implements ChunkProvider { private final BlockingQueue invalidateChunks = Queues.newLinkedBlockingQueue(); private final Map chunkCache = Maps.newHashMap(); private final BlockManager blockManager; - private final ChunkProcessingPipeline loadingPipeline; private EntityRef worldEntity = EntityRef.NULL; private ChunkReadyListener listener; - private BlockingQueue receivedChunks; + private FluxSink chunkSink; + private final Set needsLightMerging = Sets.newHashSet(); public RemoteChunkProvider(BlockManager blockManager, LocalPlayer localPlayer) { this.blockManager = blockManager; - loadingPipeline = new ChunkProcessingPipeline(this::getChunk, Flux.create(sink -> sink.onRequest(num -> { - try { - for (int i = 0; i < num; i++) { - Chunk chunk = receivedChunks.take(); - sink.next(chunk); - } - } catch (InterruptedException e) { - sink.error(e); - } - }))); - receivedChunks = new PriorityBlockingQueue<>(64, new LocalPlayerRelativeChunkComparator(localPlayer)); - - loadingPipeline.addStage( - ChunkTaskProvider.create("Chunk generate internal lightning", - (Consumer) InternalLightProcessor::generateInternalLighting)) - .addStage(ChunkTaskProvider.create("Chunk deflate", Chunk::deflate)) - .addStage(ChunkTaskProvider.createMulti("Light merging", - chunks -> { - Chunk[] localchunks = chunks.toArray(new Chunk[0]); - return new LightMerger().merge(localchunks); - }, - pos -> StreamSupport.stream(new BlockRegion(pos).expand(1, 1, 1).spliterator(), false) - .map(Vector3i::new) - .collect(Collectors.toSet()) - )) - .addStage(ChunkTaskProvider.create("", readyChunks::add)); + Flux.push(sink -> chunkSink = sink) + .publishOn(GameScheduler.parallel()) + .subscribe(chunk -> { + InternalLightProcessor.generateInternalLighting(chunk); + chunk.deflate(); + readyChunks.add(chunk); + }); ChunkMonitor.fireChunkProviderInitialized(this); } @@ -99,15 +77,25 @@ public void subscribe(ChunkReadyListener chunkReadyListener) { this.listener = chunkReadyListener; } - public void receiveChunk(final Chunk chunk) { - receivedChunks.add(chunk); + chunkSink.next(chunk); } public void invalidateChunks(Vector3ic pos) { invalidateChunks.offer(pos); } + private void tryLightMerging(Vector3ic chunkPos) { + Chunk[] chunks = StreamSupport.stream(new BlockRegion(chunkPos).expand(1, 1, 1).spliterator(), false) + .map(chunkCache::get) + .filter(Objects::nonNull) + .toArray(Chunk[]::new); + if (chunks.length == 27) { + new LightMerger().merge(chunks); + needsLightMerging.remove(chunkPos); + } + } + @Override public void update() { if (listener != null) { @@ -115,15 +103,20 @@ public void update() { } Chunk chunk; while ((chunk = readyChunks.poll()) != null) { - Chunk oldChunk = chunkCache.put(chunk.getPosition(new Vector3i()), chunk); + Vector3ic chunkPos = chunk.getPosition(); + Chunk oldChunk = chunkCache.put(chunkPos, chunk); if (oldChunk != null) { oldChunk.dispose(); } chunk.markReady(); + GameScheduler.scheduleParallel("light merging", + () -> StreamSupport.stream(new BlockRegion(chunkPos).expand(1, 1, 1).spliterator(), false) + .filter(needsLightMerging::contains) + .forEach(this::tryLightMerging)); if (listener != null) { - listener.onChunkReady(chunk.getPosition(new Vector3i())); + listener.onChunkReady(chunkPos); } - worldEntity.send(new OnChunkLoaded(chunk.getPosition(new Vector3i()))); + worldEntity.send(new OnChunkLoaded(chunkPos)); } } @@ -162,8 +155,8 @@ public boolean isChunkReady(Vector3ic pos) { @Override public void dispose() { + chunkSink.complete(); ChunkMonitor.fireChunkProviderDisposed(this); - loadingPipeline.shutdown(); } @Override @@ -208,21 +201,4 @@ public ChunkViewCore getSubview(BlockRegionc region, Vector3ic offset) { public void setWorldEntity(EntityRef entity) { this.worldEntity = entity; } - - private static final class LocalPlayerRelativeChunkComparator implements Comparator { - private final LocalPlayer localPlayer; - - private LocalPlayerRelativeChunkComparator(LocalPlayer localPlayer) { - this.localPlayer = localPlayer; - } - - @Override - public int compare(Chunk o1, Chunk o2) { - return score(o1) - score(o2); - } - - private int score(Chunk chunk) { - return (int) Chunks.toChunkPos(localPlayer.getPosition(new Vector3f()), new Vector3i()).distance(chunk.getPosition()); - } - } } diff --git a/engine/src/main/java/org/terasology/engine/world/propagation/light/LightMerger.java b/engine/src/main/java/org/terasology/engine/world/propagation/light/LightMerger.java index a669948ba5a..de676cca1bb 100644 --- a/engine/src/main/java/org/terasology/engine/world/propagation/light/LightMerger.java +++ b/engine/src/main/java/org/terasology/engine/world/propagation/light/LightMerger.java @@ -56,9 +56,9 @@ public Chunk merge(Chunk[] localChunks) { Preconditions.checkArgument(Arrays.stream(localChunks).noneMatch(Objects::isNull), "Parameter [localChunks] " + "must not contains nulls"); - Arrays.sort(localChunks, Comparator.comparingInt(c -> c.getPosition(new Vector3i()).x) - .thenComparingInt(c -> c.getPosition(new Vector3i()).y) - .thenComparing(c -> c.getPosition(new Vector3i()).z)); + Arrays.sort(localChunks, Comparator.comparingInt(c -> c.getPosition().x()) + .thenComparingInt(c -> c.getPosition().y()) + .thenComparingInt(c -> c.getPosition().z())); Chunk chunk = localChunks[CENTER_INDEX]; List propagators = Lists.newArrayList(); From 1011484f3438e672af863fa78ff51e4cc9459691 Mon Sep 17 00:00:00 2001 From: tolziplohu Date: Tue, 31 Aug 2021 15:23:13 -0500 Subject: [PATCH 2/2] refactor: simplify updating logic, factor out constants --- .../LocalChunkProvider.java | 34 ++++--------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java index 9326e0031e8..6fd7dd1a0e3 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java @@ -41,7 +41,6 @@ import org.terasology.engine.world.chunks.event.OnChunkLoaded; import org.terasology.engine.world.chunks.event.PurgeWorldEvent; import org.terasology.engine.world.chunks.internal.ChunkImpl; -import org.terasology.engine.world.chunks.internal.ChunkRelevanceRegion; import org.terasology.engine.world.generation.impl.EntityBufferImpl; import org.terasology.engine.world.generator.WorldGenerator; import org.terasology.engine.world.internal.ChunkViewCore; @@ -90,6 +89,8 @@ public class LocalChunkProvider implements ChunkProvider { private static final Logger logger = LoggerFactory.getLogger(LocalChunkProvider.class); private static final int UNLOAD_PER_FRAME = 64; + private static final int NUM_CHUNK_THREADS = 2; + private static final int CHUNKS_AT_ONCE = 16; private final EntityManager entityManager; private final BlockingQueue readyChunks = Queues.newLinkedBlockingQueue(); private final BlockingQueue> deactivateBlocksQueue = Queues.newLinkedBlockingQueue(); @@ -415,7 +416,8 @@ private boolean isChunkReady(Chunk chunk) { } public void notifyRelevanceChanged() { - nextChunks(16); + updateList(); + nextChunks(CHUNKS_AT_ONCE); } private void updateList() { @@ -426,24 +428,6 @@ private void updateList() { chunksInRange.sort(relevanceSystem.createChunkPosComparator().reversed()); } - private boolean checkForUpdate() { - Collection regions = relevanceSystem.getRegions(); - if (lastRegions == null || regions.size() != lastRegions.length) { - lastRegions = regions.stream().map(ChunkRelevanceRegion::getCurrentRegion).toArray(BlockRegion[]::new); - return true; - } - int i = 0; - boolean anyChanged = false; - for (ChunkRelevanceRegion region : regions) { - if (!lastRegions[i].equals(region.getCurrentRegion())) { - lastRegions[i].set(region.getCurrentRegion()); - anyChanged = true; - } - i++; - } - return anyChanged; - } - /** * Loads a chunk if possible, otherwise generates it. * @@ -467,13 +451,9 @@ private Chunk genChunk(Vector3ic pos) { * Computes the next `numChunks` chunks to generate. * This must be synchronized. */ - private synchronized List chunksToGenerate(int numChunks) { + private List chunksToGenerate(int numChunks) { List chunks = new ArrayList<>(numChunks); - if (checkForUpdate()) { - updateList(); - } - while (chunks.size() < numChunks && !chunksInRange.isEmpty()) { Vector3ic pos = chunksInRange.remove(chunksInRange.size() - 1); if (currentlyProcessing.contains(pos)) { @@ -488,7 +468,7 @@ private synchronized List chunksToGenerate(int numChunks) { } /** - * Tells the ChunkProcessingPipeline that no more chunks are coming after what's currently queued. + * Tells the LocalChunkProvider that no more chunks are coming after what's currently queued. * Intended for use in tests. */ protected void markComplete() { @@ -513,7 +493,7 @@ public void setRelevanceSystem(RelevanceSystem relevanceSystem, Scheduler schedu chunkSink = sink; sink.onRequest(this::nextChunks); }) - .parallel(2, 16) + .parallel(NUM_CHUNK_THREADS, CHUNKS_AT_ONCE) .runOn(scheduler, 2) .map(this::genChunk) .map(x -> {