From da939406c7900f95e4c1888e8672968e38afe69a Mon Sep 17 00:00:00 2001 From: Spottedleaf Date: Sat, 13 Jul 2019 09:23:10 -0700 Subject: [PATCH] Asynchronous chunk IO and loading --- .../co/aikar/timings/WorldTimingsHandler.java | 14 + .../ChunkPacketBlockControllerAntiXray.java | 43 +- .../paper/io/ConcreteFileIOThread.java | 661 ++++++++++++++++++ .../com/destroystokyo/paper/io/IOUtil.java | 62 ++ .../paper/io/PrioritizedTaskQueue.java | 258 +++++++ .../paper/io/QueueExecutorThread.java | 216 ++++++ .../paper/io/chunk/ChunkLoadTask.java | 104 +++ .../paper/io/chunk/ChunkLoadTaskManager.java | 119 ++++ .../minecraft/server/ChunkProviderServer.java | 135 ++++ .../minecraft/server/ChunkRegionLoader.java | 51 +- .../net/minecraft/server/ChunkStatus.java | 1 + .../net/minecraft/server/IChunkLoader.java | 29 +- .../java/net/minecraft/server/MCUtil.java | 5 + .../net/minecraft/server/MinecraftServer.java | 1 + .../net/minecraft/server/PlayerChunkMap.java | 204 +++++- .../java/net/minecraft/server/RegionFile.java | 2 +- .../net/minecraft/server/RegionFileCache.java | 6 +- .../minecraft/server/RegionFileSection.java | 56 +- .../java/net/minecraft/server/TicketType.java | 1 + .../net/minecraft/server/VillagePlace.java | 66 +- .../net/minecraft/server/WorldServer.java | 74 ++ .../org/bukkit/craftbukkit/CraftWorld.java | 36 +- 22 files changed, 2066 insertions(+), 78 deletions(-) create mode 100644 src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java create mode 100644 src/main/java/com/destroystokyo/paper/io/IOUtil.java create mode 100644 src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java create mode 100644 src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java create mode 100644 src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java create mode 100644 src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java diff --git a/src/main/java/co/aikar/timings/WorldTimingsHandler.java b/src/main/java/co/aikar/timings/WorldTimingsHandler.java index 366de66657..7f623270f9 100644 --- a/src/main/java/co/aikar/timings/WorldTimingsHandler.java +++ b/src/main/java/co/aikar/timings/WorldTimingsHandler.java @@ -51,6 +51,13 @@ public class WorldTimingsHandler { public final Timing worldSaveChunks; public final Timing worldSaveLevel; public final Timing chunkSaveData; + public final Timing poiUnload; + public final Timing chunkUnload; + public final Timing poiSaveDataSerialization; + public final Timing chunkSave; + public final Timing chunkSaveOverwriteCheck; + public final Timing chunkSaveDataSerialization; + public final Timing chunkSaveIOWait; public WorldTimingsHandler(World server) { String name = server.worldData.getName() +" - "; @@ -99,6 +106,13 @@ public class WorldTimingsHandler { tracker2 = Timings.ofSafe(name + "tracker stage 2"); doTick = Timings.ofSafe(name + "doTick"); tickEntities = Timings.ofSafe(name + "tickEntities"); + poiUnload = Timings.ofSafe(name + "Chunk unload - POI"); + chunkUnload = Timings.ofSafe(name + "Chunk unload - Chunk"); + poiSaveDataSerialization = Timings.ofSafe(name + "Chunk save - POI Data serialization"); + chunkSave = Timings.ofSafe(name + "Chunk save - Chunk"); + chunkSaveOverwriteCheck = Timings.ofSafe(name + "Chunk save - Chunk Overwrite check"); + chunkSaveDataSerialization = Timings.ofSafe(name + "Chunk save - Chunk Data serialization"); + chunkSaveIOWait = Timings.ofSafe(name + "Chunk save - Chunk IO wait"); } public static Timing getTickList(WorldServer worldserver, String timingsType) { diff --git a/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java b/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java index 9d8bee5cac..c2dd59f573 100644 --- a/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java +++ b/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java @@ -150,6 +150,12 @@ public class ChunkPacketBlockControllerAntiXray extends ChunkPacketBlockControll private final AtomicInteger xrayRequests = new AtomicInteger(); + // Paper start - async chunk api + private Integer nextTicketHold() { + return Integer.valueOf(this.xrayRequests.getAndIncrement()); + } + // Paper end + private Integer addXrayTickets(final int x, final int z, final ChunkProviderServer chunkProvider) { final Integer hold = Integer.valueOf(this.xrayRequests.getAndIncrement()); @@ -181,6 +187,33 @@ public class ChunkPacketBlockControllerAntiXray extends ChunkPacketBlockControll chunk.world.getChunkAt(locX, locZ + 1); } + // Paper start - async chunk api + private void loadNeighbourAsync(ChunkProviderServer chunkProvider, int chunkX, int chunkZ, int[] counter, java.util.function.Consumer onNeighourLoad, Runnable onAllNeighboursLoad) { + chunkProvider.getChunkAtAsynchronously(chunkX, chunkZ, true, (Chunk neighbour) -> { + onNeighourLoad.accept(neighbour); + if (++counter[0] == 4) { + onAllNeighboursLoad.run(); + } + }); + } + + private void loadNeighboursAsync(Chunk chunk, java.util.function.Consumer onNeighourLoad, Runnable onAllNeighboursLoad) { + int[] loaded = new int[1]; + + int locX = chunk.getPos().x; + int locZ = chunk.getPos().z; + + onNeighourLoad.accept(chunk); + + ChunkProviderServer chunkProvider = ((WorldServer)chunk.world).getChunkProvider(); + + this.loadNeighbourAsync(chunkProvider, locX - 1, locZ, loaded, onNeighourLoad, onAllNeighboursLoad); + this.loadNeighbourAsync(chunkProvider, locX + 1, locZ, loaded, onNeighourLoad, onAllNeighboursLoad); + this.loadNeighbourAsync(chunkProvider, locX, locZ - 1, loaded, onNeighourLoad, onAllNeighboursLoad); + this.loadNeighbourAsync(chunkProvider, locX, locZ + 1, loaded, onNeighourLoad, onAllNeighboursLoad); + } + // Paper end + @Override public boolean onChunkPacketCreate(Chunk chunk, int chunkSectionSelector, boolean force) { int locX = chunk.getPos().x; @@ -256,11 +289,15 @@ public class ChunkPacketBlockControllerAntiXray extends ChunkPacketBlockControll if (chunks[0] == null || chunks[1] == null || chunks[2] == null || chunks[3] == null) { // we need to load - MinecraftServer.getServer().scheduleOnMain(() -> { - Integer ticketHold = this.addXrayTickets(locX, locZ, world.getChunkProvider()); - this.loadNeighbours(chunk); + // Paper start - async chunk api + Integer ticketHold = this.nextTicketHold(); + this.loadNeighboursAsync(chunk, (Chunk neighbour) -> { // when a neighbour is loaded + ((WorldServer)neighbour.world).getChunkProvider().addTicket(TicketType.ANTIXRAY, neighbour.getPos(), 0, ticketHold); + }, + () -> { // once neighbours get loaded this.modifyBlocks(packetPlayOutMapChunk, chunkPacketInfo, false, ticketHold); }); + // Paper end return; } diff --git a/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java new file mode 100644 index 0000000000..8cbe522f23 --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java @@ -0,0 +1,661 @@ +package com.destroystokyo.paper.io; + +import net.minecraft.server.ChunkCoordIntPair; +import net.minecraft.server.ExceptionWorldConflict; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.NBTTagCompound; +import net.minecraft.server.RegionFile; +import net.minecraft.server.WorldServer; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Prioritized singleton thread responsible for all chunk IO that occurs in a minecraft server. + * + *

+ * Singleton access: {@link Holder#INSTANCE} + *

+ * + *

+ * All functions provided are MT-Safe, however certain ordering constraints are (but not enforced): + *

  • + * Chunk saves may not occur for unloaded chunks. + *
  • + *
  • + * Tasks must be scheduled on the main thread. + *
  • + *

    + * + * @see Holder#INSTANCE + * @see #scheduleSave(WorldServer, int, int, NBTTagCompound, NBTTagCompound, int) + * @see #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean) + */ +public final class ConcreteFileIOThread extends QueueExecutorThread { + + public static final Logger LOGGER = MinecraftServer.LOGGER; + public static final NBTTagCompound FAILURE_VALUE = new NBTTagCompound(); + + public static final class Holder { + + public static final ConcreteFileIOThread INSTANCE = new ConcreteFileIOThread(); + + static { + INSTANCE.start(); + } + } + + private final AtomicLong writeCounter = new AtomicLong(); + + private ConcreteFileIOThread() { + super(new PrioritizedTaskQueue<>(), (int)(1.0e6)); // 1.0ms spinwait time + this.setName("Concrete RegionFile IO Thread"); + this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us + this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> { + LOGGER.fatal("Uncaught exception thrown from IO thread, report this!", thr); + }); + } + + /* run() is implemented by superclass */ + + /* + * + * IO thread will perform reads before writes + * + * How reads/writes are scheduled: + * + * If read in progress while scheduling write, ignore read and schedule write + * If read in progress while scheduling read (no write in progress), chain the read task + * + * + * If write in progress while scheduling read, use the pending write data and ret immediately + * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data + * + * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however + * it fails to properly propagate write failures + * + */ + + /** + * Attempts to bump the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued. + * @param world Chunk's world + * @param chunkX Chunk's x coordinate + * @param chunkZ Chunk's z coordinate + * @param priority Priority level to try to bump to + */ + public void bumpPriority(final WorldServer world, final int chunkX, final int chunkZ, final int priority) { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority: " + priority); + } + + final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)); + + final ChunkDataTask poiTask = world.poiDataController.tasks.get(key); + final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key); + + if (poiTask != null) { + poiTask.raisePriority(priority); + } + if (chunkTask != null) { + chunkTask.raisePriority(priority); + } + } + + // Hack start + /** + * if {@code waitForRead} is true, then this task will wait on an available read task, else it will wait on an available + * write task + * if {@code poiTask} is true, then this task will wait on a poi task, else it will wait on chunk data task + * @deprecated API is garbage and will only work for main thread queueing of tasks (which is vanilla), plugins messing + * around asynchronously will give unexpected results + * @return whether the task succeeded, or {@code null} if there is no task + */ + @Deprecated + public Boolean waitForIOToComplete(final WorldServer world, final int chunkX, final int chunkZ, final boolean waitForRead, + final boolean poiTask) { + final ChunkDataTask task; + + final Long key = IOUtil.getCoordinateKey(chunkX, chunkZ); + if (poiTask) { + task = world.poiDataController.tasks.get(key); + } else { + task = world.chunkDataController.tasks.get(key); + } + + if (task == null) { + return null; + } + + if (waitForRead) { + ChunkDataController.InProgressRead read = task.inProgressRead; + if (read == null) { + return null; + } + return Boolean.valueOf(read.readFuture.join() != null); + } + + // wait for write + ChunkDataController.InProgressWrite write = task.inProgressWrite; + if (write == null) { + return null; + } + return Boolean.valueOf(write.wrote.join() != null); + } + // Hack end + + public NBTTagCompound getPendingWrite(final WorldServer world, final int chunkX, final int chunkZ, final boolean poiData) { + final ChunkDataController taskController = poiData ? world.poiDataController : world.chunkDataController; + + final ChunkDataTask dataTask = taskController.tasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ))); + + if (dataTask == null) { + return null; + } + + final ChunkDataController.InProgressWrite write = dataTask.inProgressWrite; + + if (write == null) { + return null; + } + + return write.data; + } + + /** + * Sets the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued. + * @param world Chunk's world + * @param chunkX Chunk's x coordinate + * @param chunkZ Chunk's z coordinate + * @param priority Priority level to set to + */ + public void setPriority(final WorldServer world, final int chunkX, final int chunkZ, final int priority) { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority: " + priority); + } + + final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)); + + final ChunkDataTask poiTask = world.poiDataController.tasks.get(key); + final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key); + + if (poiTask != null) { + poiTask.updatePriority(priority); + } + if (chunkTask != null) { + chunkTask.updatePriority(priority); + } + } + + /** + * Schedules the chunk data to be written asynchronously. + *

    + * Impl notes: + *

    + *
  • + * This function presumes a chunk load for the coordinates is not called during this function (anytime after is OK). This means + * saves must be scheduled before a chunk is unloaded. + *
  • + *
  • + * Writes may be called concurrently, although only the "later" write will go through. + *
  • + * @param world Chunk's world + * @param chunkX Chunk's x coordinate + * @param chunkZ Chunk's z coordinate + * @param poiData Chunk point of interest data. If {@code null}, then no poi data is saved. + * @param chunkData Chunk data. If {@code null}, then no chunk data is saved. + * @param priority Priority level for this task. See {@link PrioritizedTaskQueue} + * @throws IllegalArgumentException If both {@code poiData} and {@code chunkData} are {@code null}. + * @throws IllegalStateException If the file io thread has shutdown. + */ + public void scheduleSave(final WorldServer world, final int chunkX, final int chunkZ, + final NBTTagCompound poiData, final NBTTagCompound chunkData, + final int priority) throws IllegalArgumentException { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority: " + priority); + } + + final long writeCounter = this.writeCounter.getAndIncrement(); + + if (poiData != null) { + this.scheduleWrite(world.poiDataController, world, chunkX, chunkZ, poiData, priority, writeCounter); + } + if (chunkData != null) { + this.scheduleWrite(world.chunkDataController, world, chunkX, chunkZ, chunkData, priority, writeCounter); + } + } + + private void scheduleWrite(final ChunkDataController dataController, final WorldServer world, + final int chunkX, final int chunkZ, final NBTTagCompound data, final int priority, final long writeCounter) { + dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask taskRunning) -> { + if (taskRunning == null) { + // no task is scheduled + + // create task + final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController); + newTask.inProgressWrite = new ChunkDataController.InProgressWrite(); + newTask.inProgressWrite.writeCounter = writeCounter; + newTask.inProgressWrite.data = data; + + ConcreteFileIOThread.this.queueTask(newTask); // schedule + return newTask; + } + + taskRunning.raisePriority(priority); + + if (taskRunning.inProgressWrite == null) { + taskRunning.inProgressWrite = new ChunkDataController.InProgressWrite(); + } + + boolean reschedule = taskRunning.inProgressWrite.writeCounter == -1L; + + // synchronize for readers + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (taskRunning) { + taskRunning.inProgressWrite.data = data; + taskRunning.inProgressWrite.writeCounter = writeCounter; + } + + if (reschedule) { + // We need to reschedule this task since the previous one is not currently scheduled since it failed + taskRunning.reschedule(priority); + } + + return taskRunning; + }); + } + + /** + * Same as {@link #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns + * a {@link CompletableFuture} which is potentially completed ASYNCHRONOUSLY ON THE FILE IO THREAD when the load task + * has completed. + *

    + * Note that if the chunk fails to load the returned future is completed with {@code null}. + *

    + */ + public CompletableFuture loadChunkDataAsyncFuture(final WorldServer world, final int chunkX, final int chunkZ, + final int priority, final boolean readPoiData, final boolean readChunkData, + final boolean intendingToBlock) { + final CompletableFuture future = new CompletableFuture<>(); + this.loadChunkDataAsync(world, chunkX, chunkZ, priority, future::complete, readPoiData, readChunkData, intendingToBlock); + return future; + } + + /** + * Schedules a load to be executed asynchronously. + *

    + * Impl notes: + *

    + *
  • + * If a chunk fails to load, the {@code onComplete} parameter is completed with {@code null}. + *
  • + *
  • + * It is possible for the {@code onComplete} parameter to be given {@link ChunkData} containing data + * this call did not request. + *
  • + *
  • + * The {@code onComplete} parameter may be completed during the execution of this function synchronously or it may + * be completed asynchronously on this file io thread. Interacting with the file IO thread in the completion of + * data is undefined behaviour, and can cause deadlock. + *
  • + * @param world Chunk's world + * @param chunkX Chunk's x coordinate + * @param chunkZ Chunk's z coordinate + * @param priority Priority level for this task. See {@link PrioritizedTaskQueue} + * @param onComplete Consumer to execute once this task has completed + * @param readPoiData Whether to read point of interest data. If {@code false}, the {@code NBTTagCompound} will be {@code null}. + * @param readChunkData Whether to read chunk data. If {@code false}, the {@code NBTTagCompound} will be {@code null}. + * @return The {@link PrioritizedTaskQueue.PrioritizedTask} associated with this task. Note that this task does not support + * cancellation. + */ + public void loadChunkDataAsync(final WorldServer world, final int chunkX, final int chunkZ, + final int priority, final Consumer onComplete, + final boolean readPoiData, final boolean readChunkData, + final boolean intendingToBlock) { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority: " + priority); + } + + if (!(readPoiData | readChunkData)) { + throw new IllegalArgumentException("Must read chunk data or poi data"); + } + + final ChunkData complete = new ChunkData(); + final boolean[] requireCompletion = new boolean[] { readPoiData, readChunkData }; + + if (readPoiData) { + this.scheduleRead(world.poiDataController, world, chunkX, chunkZ, (final NBTTagCompound poiData) -> { + complete.poiData = poiData; + + final boolean finished; + + // avoid a race condition where the file io thread completes and we complete synchronously + // Note: Synchronization can be elided if both of the accesses are volatile + synchronized (requireCompletion) { + requireCompletion[0] = false; // 0 -> poi data + finished = !requireCompletion[1]; // 1 -> chunk data + } + + if (finished) { + onComplete.accept(complete); + } + }, priority, intendingToBlock); + } + + if (readChunkData) { + this.scheduleRead(world.chunkDataController, world, chunkX, chunkZ, (final NBTTagCompound chunkData) -> { + complete.chunkData = chunkData; + + final boolean finished; + + // avoid a race condition where the file io thread completes and we complete synchronously + // Note: Synchronization can be elided if both of the accesses are volatile + synchronized (requireCompletion) { + requireCompletion[1] = false; // 1 -> chunk data + finished = !requireCompletion[0]; // 0 -> poi data + } + + if (finished) { + onComplete.accept(complete); + } + }, priority, intendingToBlock); + } + + } + + // Note: the onComplete may be called asynchronously or synchronously here. + private void scheduleRead(final ChunkDataController dataController, final WorldServer world, + final int chunkX, final int chunkZ, final Consumer onComplete, final int priority, + final boolean intendingToBlock) { + + Function tryLoadFunction = (final RegionFile file) -> { + if (file == null) { + return Boolean.TRUE; + } + return Boolean.valueOf(file.chunkExists(new ChunkCoordIntPair(chunkX, chunkZ))); + }; + + dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask running) -> { + if (running == null) { + // not scheduled + + final Boolean shouldSchedule = intendingToBlock ? dataController.computeForRegionFile(chunkX, chunkZ, tryLoadFunction) : + dataController.computeForRegionFileIfLoaded(chunkX, chunkZ, tryLoadFunction); + + if (shouldSchedule == Boolean.FALSE) { + // not on disk + onComplete.accept(null); + return null; + } + + // set up task + final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController); + newTask.inProgressRead = new ChunkDataController.InProgressRead(); + newTask.inProgressRead.readFuture.thenAccept(onComplete); + + ConcreteFileIOThread.this.queueTask(newTask); // schedule task + return newTask; + } + + running.raisePriority(priority); + + if (running.inProgressWrite == null) { + // chain to the read future + running.inProgressRead.readFuture.thenAccept(onComplete); + return running; + } + + // at this stage we have to use the in progress write's data to avoid an order issue + // we don't synchronize since all writes to data occur in the compute() call + onComplete.accept(running.inProgressWrite.data); + return running; + }); + } + + /** + * Same as {@link #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns + * the {@link ChunkData} associated with the specified chunk when the task is complete. + * @return The chunk data, or {@code null} if the chunk failed to load. + */ + public ChunkData loadChunkData(final WorldServer world, final int chunkX, final int chunkZ, final int priority, + final boolean readPoiData, final boolean readChunkData) { + return this.loadChunkDataAsyncFuture(world, chunkX, chunkZ, priority, readPoiData, readChunkData, true).join(); + } + + /** + * Schedules the given task at the specified priority to be executed on the IO thread. + *

    + * Internal api. Do not use. + *

    + */ + public void runTask(final int priority, final Runnable runnable) { + this.queueTask(new GeneralTask(priority, runnable)); + } + + static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable { + + private final Runnable run; + + public GeneralTask(final int priority, final Runnable run) { + super(priority); + this.run = IOUtil.notNull(run, "Task may not be null"); + } + + @Override + public void run() { + try { + this.run.run(); + } catch (final Throwable throwable) { + if (throwable instanceof ThreadDeath) { + throw (ThreadDeath)throwable; + } + LOGGER.fatal("Failed to execute general task on IO thread " + IOUtil.genericToString(this.run), throwable); + } + } + } + + public static final class ChunkData { + + public NBTTagCompound poiData; + public NBTTagCompound chunkData; + + public ChunkData() {} + + public ChunkData(final NBTTagCompound poiData, final NBTTagCompound chunkData) { + this.poiData = poiData; + this.chunkData = chunkData; + } + } + + public static abstract class ChunkDataController { + + // ConcurrentHashMap synchronizes per chain, so reduce the chance of task's hashes colliding. + public final ConcurrentHashMap tasks = new ConcurrentHashMap<>(64, 0.5f); + + public abstract void writeData(final int x, final int z, final NBTTagCompound compound) throws IOException; + public abstract NBTTagCompound readData(final int x, final int z) throws IOException; + + public abstract T computeForRegionFile(final int chunkX, final int chunkZ, final Function function); + public abstract T computeForRegionFileIfLoaded(final int chunkX, final int chunkZ, final Function function); + + public static final class InProgressWrite { + public long writeCounter; + public NBTTagCompound data; + + // Hack start + @Deprecated + public CompletableFuture wrote = new CompletableFuture<>(); + // Hack end + } + + public static final class InProgressRead { + public final CompletableFuture readFuture = new CompletableFuture<>(); + } + } + + public static final class ChunkDataTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable { + + public ChunkDataController.InProgressWrite inProgressWrite; + public ChunkDataController.InProgressRead inProgressRead; + + private final WorldServer world; + private final int x; + private final int z; + private final ChunkDataController taskController; + + public ChunkDataTask(final int priority, final WorldServer world, final int x, final int z, final ChunkDataController taskController) { + super(priority); + this.world = world; + this.x = x; + this.z = z; + this.taskController = taskController; + } + + @Override + public String toString() { + return "Task for world " + this.world.getWorld().getName() + " at " + this.x + ", " + this.z + " poi: " + (this.taskController == this.world.poiDataController); + } + + /* + * + * IO thread will perform reads before writes + * + * How reads/writes are scheduled: + * + * If read in progress while scheduling write, ignore read and schedule write + * If read in progress while scheduling read (no write in progress), chain the read task + * + * + * If write in progress while scheduling read, use the pending write data and ret immediately + * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data + * + * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however + * it fails to properly propagate write failures + * + */ + + void reschedule(final int priority) { + // priority is checked before this stage + this.queue.lazySet(null); + this.inProgressWrite.wrote = new CompletableFuture<>(); // Hack + this.priority.lazySet(priority); + ConcreteFileIOThread.Holder.INSTANCE.queueTask(this); + } + + @Override + public void run() { + ChunkDataController.InProgressRead read = this.inProgressRead; + if (read != null) { + NBTTagCompound compound = ConcreteFileIOThread.FAILURE_VALUE; + try { + compound = this.taskController.readData(this.x, this.z); + } catch (final Throwable thr) { + if (thr instanceof ThreadDeath) { + throw (ThreadDeath)thr; + } + LOGGER.fatal("Failed to read chunk data for (" + this.x + "," + this.z + ")", thr); + // fall through to complete with null data + } + read.readFuture.complete(compound); + } + + final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(this.x, this.z)); + + ChunkDataController.InProgressWrite write = this.inProgressWrite; + + if (write == null) { + // IntelliJ warns this is invalid, however it does not consider that writes to the task map & the inProgress field can occur concurrently. + ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> { + if (valueInMap == null) { + throw new IllegalStateException("Write completed concurrently, report this"); + } + if (valueInMap != ChunkDataTask.this) { + throw new IllegalStateException("Chunk task mismatch, report this"); + } + return valueInMap.inProgressWrite == null ? null : valueInMap; + }); + + if (inMap == null) { + return; // set the task value to null, indicating we're done + } + + // not null, which means there was a concurrent write + write = this.inProgressWrite; + } + + // check if another process is writing + try { + this.world.checkSession(); + } catch (final ExceptionWorldConflict ex) { + LOGGER.fatal("Couldn't save chunk; already in use by another instance of Minecraft?", ex); + // we don't need to set the write counter to -1 as we know at this stage there's no point in re-scheduling + // writes since they'll fail anyways. + write.wrote.complete(ConcreteFileIOThread.FAILURE_VALUE); // Hack - However we need to fail the write + return; + } + + for (;;) { + final long writeCounter; + final NBTTagCompound data; + + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (write) { + writeCounter = write.writeCounter; + data = write.data; + } + + boolean failedWrite = false; + + try { + this.taskController.writeData(this.x, this.z, data); + } catch (final Throwable thr) { + if (thr instanceof ThreadDeath) { + throw (ThreadDeath)thr; + } + LOGGER.fatal("Failed to write chunk data for (" + this.x + "," + this.z + ")", thr); + failedWrite = true; + } + + boolean finalFailWrite = failedWrite; + boolean[] returnFailWrite = new boolean[] { false }; + + ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> { + if (valueInMap == null) { + throw new IllegalStateException("Write completed concurrently, report this"); + } + if (valueInMap != ChunkDataTask.this) { + throw new IllegalStateException("Chunk task mismatch, report this"); + } + if (finalFailWrite) { + if (valueInMap.inProgressWrite.writeCounter == writeCounter) { + valueInMap.inProgressWrite.writeCounter = -1L; + returnFailWrite[0] = true; + } + // Hack start + valueInMap.inProgressWrite.wrote.complete(ConcreteFileIOThread.FAILURE_VALUE); + return valueInMap; + } + if (valueInMap.inProgressWrite.writeCounter == writeCounter) { + valueInMap.inProgressWrite.wrote.complete(data); + return null; + } + return valueInMap; + // Hack end + }); + + if (inMap == null || returnFailWrite[0]) { + // write counter matched, so we wrote the most up-to-date pending data, we're done here + // or we failed to write and successfully set the write counter to -1 + return; // we're done here + } + + // fetch & write new data + continue; + } + } + } +} diff --git a/src/main/java/com/destroystokyo/paper/io/IOUtil.java b/src/main/java/com/destroystokyo/paper/io/IOUtil.java new file mode 100644 index 0000000000..5af0ac3d9e --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/IOUtil.java @@ -0,0 +1,62 @@ +package com.destroystokyo.paper.io; + +import org.bukkit.Bukkit; + +public final class IOUtil { + + /* Copied from concrete or concurrentutil */ + + public static long getCoordinateKey(final int x, final int z) { + return ((long)z << 32) | (x & 0xFFFFFFFFL); + } + + public static int getCoordinateX(final long key) { + return (int)key; + } + + public static int getCoordinateZ(final long key) { + return (int)(key >>> 32); + } + + public static int getRegionCoordinate(final int chunkCoordinate) { + return chunkCoordinate >> 5; + } + + public static int getChunkInRegion(final int chunkCoordinate) { + return chunkCoordinate & 31; + } + + public static String genericToString(final Object object) { + return object == null ? "null" : object.getClass().getName() + ":" + object.toString(); + } + + public static T notNull(final T obj) { + if (obj == null) { + throw new NullPointerException(); + } + return obj; + } + + public static T notNull(final T obj, final String msgIfNull) { + if (obj == null) { + throw new NullPointerException(msgIfNull); + } + return obj; + } + + public static void arrayBounds(final int off, final int len, final int arrayLength, final String msgPrefix) { + if (off < 0 || len < 0 || (arrayLength - off) < len) { + throw new ArrayIndexOutOfBoundsException(msgPrefix + ": off: " + off + ", len: " + len + ", array length: " + arrayLength); + } + } + + public static int getPriorityForCurrentThread() { + return Bukkit.isPrimaryThread() ? PrioritizedTaskQueue.HIGHEST_PRIORITY : PrioritizedTaskQueue.NORMAL_PRIORITY; + } + + @SuppressWarnings("unchecked") + public static void rethrow(final Throwable throwable) throws T { + throw (T)throwable; + } + +} diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java new file mode 100644 index 0000000000..c3ca3c4a1c --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java @@ -0,0 +1,258 @@ +package com.destroystokyo.paper.io; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class PrioritizedTaskQueue { + + // lower numbers are a higher priority (except < 0) + // higher priorities are always executed before lower priorities + + /** + * Priority value indicating the task has completed or is being completed. + */ + public static final int COMPLETING_PRIORITY = -1; + + /** + * Highest priority, should only be used for main thread tasks or tasks that are blocking the main thread. + */ + public static final int HIGHEST_PRIORITY = 0; + + /** + * Should be only used in an IO task so that chunk loads do not wait on other IO tasks. + * This only exists because IO tasks are scheduled before chunk load tasks to decrease IO waiting times. + */ + public static final int HIGHER_PRIORITY = 1; + + /** + * Should be used for scheduling chunk loads/generation that would increase response times to users. + */ + public static final int HIGH_PRIORITY = 2; + + /** + * Default priority. + */ + public static final int NORMAL_PRIORITY = 3; + + /** + * Use for tasks not at all critical and can potentially be delayed. + */ + public static final int LOW_PRIORITY = 4; + + /** + * Use for tasks that should "eventually" execute. + */ + public static final int LOWEST_PRIORITY = 5; + + private static final int TOTAL_PRIORITIES = 6; + + final ConcurrentLinkedQueue[] queues = (ConcurrentLinkedQueue[])new ConcurrentLinkedQueue[TOTAL_PRIORITIES]; + + private final AtomicBoolean shutdown = new AtomicBoolean(); + + { + for (int i = 0; i < TOTAL_PRIORITIES; ++i) { + this.queues[i] = new ConcurrentLinkedQueue<>(); + } + } + + /** + * Returns whether the specified priority is valid + */ + public static boolean validPriority(final int priority) { + return priority >= 0 && priority < TOTAL_PRIORITIES; + } + + /** + * Queues a task. + * @throws IllegalStateException If the task has already been queued. Use {@link PrioritizedTask#raisePriority(int)} to + * raise a task's priority. + * This can also be thrown if the queue has shutdown. + */ + public void add(final T task) throws IllegalStateException { + task.onQueue(this); + this.queues[task.getPriority()].add(task); + if (this.shutdown.get()) { + // note: we're not actually sure at this point if our task will go through + throw new IllegalStateException("Queue has shutdown, refusing to execute task " + IOUtil.genericToString(task)); + } + } + + /** + * Polls the highest priority task currently available. {@code null} if none. + */ + public T poll() { + T task; + for (int i = 0; i < TOTAL_PRIORITIES; ++i) { + final ConcurrentLinkedQueue queue = this.queues[i]; + + while ((task = queue.poll()) != null) { + final int prevPriority = task.tryComplete(i); + if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) { + // if the prev priority was greater-than or equal to our current priority + return task; + } + } + } + + return null; + } + + /** + * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will + * result in {@link IllegalStateException} being thrown. + *

    + * This operation is atomic with respect to other shutdown calls + *

    + *

    + * After this call has completed, regardless of return value, this queue will be shutdown. + *

    + * @return {@code true} if the queue was shutdown, {@code false} if it has shut down already + */ + public boolean shutdown() { + return this.shutdown.getAndSet(false); + } + + public abstract static class PrioritizedTask { + + protected final AtomicReference queue = new AtomicReference<>(); + + protected final AtomicInteger priority; + + protected PrioritizedTask() { + this(PrioritizedTaskQueue.NORMAL_PRIORITY); + } + + protected PrioritizedTask(final int priority) { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority " + priority); + } + this.priority = new AtomicInteger(priority); + } + + /** + * Returns the current priority. Note that {@link PrioritizedTaskQueue#COMPLETING_PRIORITY} will be returned + * if this task is completing or has completed. + */ + public final int getPriority() { + return this.priority.get(); + } + + /** + * Returns whether this task is scheduled to execute, or has been already executed. + */ + public boolean isScheduled() { + return this.queue.get() != null; + } + + final int tryComplete(final int minPriority) { + for (int curr = this.getPriorityVolatile();;) { + if (curr == COMPLETING_PRIORITY) { + return COMPLETING_PRIORITY; + } + if (curr > minPriority) { + // curr is lower priority + return curr; + } + + if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, COMPLETING_PRIORITY))) { + return curr; + } + continue; + } + } + + /** + * Forces this task to be completed. + * @return {@code true} if the task was cancelled, {@code false} if the task has already completed or is being completed. + */ + public boolean cancel() { + return this.exchangePriorityVolatile(PrioritizedTaskQueue.COMPLETING_PRIORITY) != PrioritizedTaskQueue.COMPLETING_PRIORITY; + } + + /** + * Attempts to raise the priority to the priority level specified. + * @param priority Priority specified + * @return {@code true} if successful, {@code false} otherwise. + */ + public boolean raisePriority(final int priority) { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority"); + } + + for (int curr = this.getPriorityVolatile();;) { + if (curr == COMPLETING_PRIORITY) { + return false; + } + if (priority >= curr) { + return true; + } + + if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) { + PrioritizedTaskQueue queue = this.queue.get(); + if (queue != null) { + //noinspection unchecked + queue.queues[priority].add(this); // silently fail on shutdown + } + return true; + } + continue; + } + } + + /** + * Attempts to set this task's priority level to the level specified. + * @param priority Specified priority level. + * @return {@code true} if successful, {@code false} if this task is completing or has completed. + */ + public boolean updatePriority(final int priority) { + if (!PrioritizedTaskQueue.validPriority(priority)) { + throw new IllegalArgumentException("Invalid priority"); + } + + for (int curr = this.getPriorityVolatile();;) { + if (curr == COMPLETING_PRIORITY) { + return false; + } + if (curr == priority) { + return true; + } + + if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) { + PrioritizedTaskQueue queue = this.queue.get(); + if (queue != null) { + //noinspection unchecked + queue.queues[priority].add(this); // silently fail on shutdown + } + return true; + } + continue; + } + } + + void onQueue(final PrioritizedTaskQueue queue) { + if (this.queue.getAndSet(queue) != null) { + throw new IllegalStateException("Already queued!"); + } + } + + /* priority */ + + protected final int getPriorityVolatile() { + return this.priority.get(); + } + + protected final int compareAndExchangePriorityVolatile(final int expect, final int update) { + if (this.priority.compareAndSet(expect, update)) { + return expect; + } + return this.priority.get(); + } + + protected final int exchangePriorityVolatile(final int value) { + return this.priority.getAndSet(value); + } + } +} diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java new file mode 100644 index 0000000000..609b2038f2 --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java @@ -0,0 +1,216 @@ +package com.destroystokyo.paper.io; + +import net.minecraft.server.MinecraftServer; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +public class QueueExecutorThread extends Thread { + + private static final Logger LOGGER = MinecraftServer.LOGGER; + + protected final PrioritizedTaskQueue queue; + protected final long spinWaitTime; + + protected volatile boolean closed; + protected final AtomicBoolean parked = new AtomicBoolean(); + + protected final ConcurrentLinkedQueue flushQueue = new ConcurrentLinkedQueue<>(); + protected volatile int flushCounter; + + public QueueExecutorThread(final PrioritizedTaskQueue queue) { + this(queue, (int)(1.e6)); // 1.0ms + } + + public QueueExecutorThread(final PrioritizedTaskQueue queue, final long spinWaitTime) { // in ms + this.queue = queue; + this.spinWaitTime = spinWaitTime; + } + + @Override + public void run() { + final long spinWaitTime = this.spinWaitTime; + main_loop: + for (;;) { + this.pollTasks(); + + // spinwait + + final long start = System.nanoTime(); + + for (;;) { + // If we are interrpted for any reason, park() will always return immediately. Clear so that we don't needlessly use cpu in such an event. + Thread.interrupted(); + LockSupport.parkNanos("Spinwaiting on tasks", 1000L); // 1us + + if (this.pollTasks()) { + continue main_loop; + } + + if (this.handleClose()) { + return; // we're done + } + + if ((System.nanoTime() - start) >= spinWaitTime) { + break; + } + } + + if (this.handleClose()) { + return; + } + + this.parked.set(true); + // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true + if (this.pollTasks()) { + this.parked.set(false); + continue; + } + + do { + Thread.interrupted(); + LockSupport.park("Waiting on tasks"); + } while (this.parked.get()); + } + } + + protected boolean handleClose() { + if (this.closed) { + // at this stage no task may be queued + this.pollTasks(); // this ensures we've emptied the queue + this.handleFlushThreads(true); + return true; + } + return false; + } + + protected boolean pollTasks() { + Runnable task; + boolean ret = false; + + while ((task = (Runnable)this.queue.poll()) != null) { + ret = true; + try { + task.run(); + } catch (final Throwable throwable) { + if (throwable instanceof ThreadDeath) { + throw (ThreadDeath)throwable; + } + LOGGER.fatal("Exception thrown from prioritized runnable task in thread " + this.getName() + ": " + IOUtil.genericToString(task), throwable); + } + } + + this.handleFlushThreads(false); + + return ret; + } + + protected void handleFlushThreads(final boolean shutdown) { + //noinspection NonAtomicOperationOnVolatileField + ++this.flushCounter; + + Thread current; + + while ((current = this.flushQueue.poll()) != null) { + LockSupport.unpark(current); + } + + } + + public boolean isWaitingForTasks() { + return this.parked.get(); + } + + public boolean notifyTasks() { + if (this.parked.get() && this.parked.getAndSet(false)) { + LockSupport.unpark(this); + return true; + } + return false; + } + + public void onTaskQueue() { + if (this.parked.get() && this.parked.getAndSet(false)) { + LockSupport.unpark(this); + } + } + + protected void queueTask(final T task) { + this.queue.add(task); + this.onTaskQueue(); + } + + + /** + * Waits until this thread's queue is empty. + * + * @throws IllegalStateException If the current thread is {@code this} thread. + */ + public void flush() { + final Thread currentThread = Thread.currentThread(); + + if (currentThread == this) { + throw new IllegalStateException("Cannot flush the queue executor thread while on the queue executor thread"); + } + + // order is important + + if (this.closed) { + return; + } + + if (this.parked.get()) { + return; // no tasks queued + } + + int flushCounter = this.flushCounter; + + this.flushQueue.add(currentThread); + + if (this.closed) { + return; + } + + // re-check parked status + if (this.parked.get()) { + return; + } + + // force a response from the IO thread, we're not sure of its state currently + this.parked.set(false); + LockSupport.unpark(this); + + boolean interrupted = false; // preserve interrupted status + + while (this.flushCounter == flushCounter) { + interrupted |= Thread.interrupted(); + LockSupport.park(); + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + /** + * Closes this queue executor's queue and optionally waits for it to empty. + *

    + * If wait is {@code true}, then the queue will be empty by the time this call completes. + *

    + *

    + * This function is MT-Safe. + *

    + * @param wait If this call is to wait until the queue is empty + * @return whether this thread shut down the queue + */ + public boolean close(final boolean wait) { + boolean ret = this.queue.shutdown(); + this.closed = true; + if (wait) { + this.flush(); + } + return ret; + } +} diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java new file mode 100644 index 0000000000..566d1684a5 --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java @@ -0,0 +1,104 @@ +package com.destroystokyo.paper.io.chunk; + +import com.destroystokyo.paper.io.ConcreteFileIOThread; +import com.destroystokyo.paper.io.IOUtil; +import com.destroystokyo.paper.io.PrioritizedTaskQueue; +import net.minecraft.server.ChunkCoordIntPair; +import net.minecraft.server.ChunkRegionLoader; +import net.minecraft.server.PlayerChunkMap; +import net.minecraft.server.WorldServer; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.function.Consumer; + +public class ChunkLoadTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable { + + public final WorldServer world; + public final int chunkX; + public final int chunkZ; + public final ChunkLoadTaskManager taskManager; + + final Consumer onComplete; + + public ConcreteFileIOThread.ChunkData chunkData; + + public ChunkLoadTask(final WorldServer world, final int chunkX, final int chunkZ, final int priority, + final Consumer onComplete, + final ChunkLoadTaskManager taskManager) { + super(priority); + this.world = world; + this.chunkX = chunkX; + this.chunkZ = chunkZ; + this.taskManager = taskManager; + this.onComplete = onComplete; + } + + private static final ArrayDeque EMPTY_QUEUE = new ArrayDeque<>(); + + private static ChunkRegionLoader.InProgressChunkHolder createEmptyHolder() { + return new ChunkRegionLoader.InProgressChunkHolder(null, EMPTY_QUEUE); + } + + @Override + public void run() { + // either executed synchronously or asynchronously + final ConcreteFileIOThread.ChunkData chunkData = this.chunkData; + + if (chunkData.poiData == ConcreteFileIOThread.FAILURE_VALUE || chunkData.chunkData == ConcreteFileIOThread.FAILURE_VALUE) { + ConcreteFileIOThread.LOGGER.error("Could not load chunk at (" + this.chunkX + "," + this.chunkZ + ")"); + this.complete(ChunkLoadTask.createEmptyHolder()); + return; + } + + if (chunkData.chunkData == null) { + ChunkRegionLoader.InProgressChunkHolder ret = new ChunkRegionLoader.InProgressChunkHolder(null, EMPTY_QUEUE); + ret.poiData = chunkData.poiData; + this.complete(ret); + return; + } + + final ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(this.chunkX, this.chunkZ); + + final PlayerChunkMap chunkManager = this.world.getChunkProvider().playerChunkMap; + + final ChunkRegionLoader.InProgressChunkHolder chunkHolder = ChunkRegionLoader.loadChunk(this.world, + chunkManager.definedStructureManager, chunkManager.getVillagePlace(), chunkPos, + chunkData.chunkData, true); + + // apply fixes + + try { + chunkData.chunkData = chunkManager.getChunkData(this.world.getWorldProvider().getDimensionManager(), + chunkManager.getWorldPersistentDataSupplier(), chunkData.chunkData, chunkPos, this.world); + } catch (IOException ex) { + ConcreteFileIOThread.LOGGER.error("Could not apply datafixers for chunk at (" + this.chunkX + "," + this.chunkZ + ")", ex); + this.complete(ChunkLoadTask.createEmptyHolder()); + } + + this.complete(chunkHolder); + } + + private void complete(final ChunkRegionLoader.InProgressChunkHolder holder) { + this.onComplete.accept(holder); + this.taskManager.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(this.chunkX, this.chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> { + if (valueInMap != ChunkLoadTask.this) { + throw new IllegalStateException("Expected this task to be scheduled, but another was! Other:" + valueInMap); + } + return null; + }); + } + + @Override + public boolean raisePriority(final int priority) { + ConcreteFileIOThread.Holder.INSTANCE.bumpPriority(this.world, this.chunkX, this.chunkZ, priority); + return super.raisePriority(priority); + } + + @Override + public boolean updatePriority(final int priority) { + ConcreteFileIOThread.Holder.INSTANCE.setPriority(this.world, this.chunkX, this.chunkZ, priority); + return super.updatePriority(priority); + } + +} diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java new file mode 100644 index 0000000000..8dbaaba3cd --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java @@ -0,0 +1,119 @@ +package com.destroystokyo.paper.io.chunk; + +import com.destroystokyo.paper.io.ConcreteFileIOThread; +import com.destroystokyo.paper.io.IOUtil; +import com.destroystokyo.paper.io.PrioritizedTaskQueue; +import com.destroystokyo.paper.io.QueueExecutorThread; +import net.minecraft.server.ChunkRegionLoader; +import net.minecraft.server.IAsyncTaskHandler; +import net.minecraft.server.WorldServer; +import org.spigotmc.AsyncCatcher; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public final class ChunkLoadTaskManager { + + private final QueueExecutorThread[] workers; + private final WorldServer world; + + private final PrioritizedTaskQueue queue = new PrioritizedTaskQueue<>(); + + final ConcurrentHashMap tasks = new ConcurrentHashMap<>(64, 0.5f); + + public ChunkLoadTaskManager(final WorldServer world, final int threads) { + this.world = world; + this.workers = threads <= 0 ? null : new QueueExecutorThread[threads]; + + for (int i = 0; i < threads; ++i) { + this.workers[i] = new QueueExecutorThread<>(this.queue, (long)0.10e6); //0.1ms + this.workers[i].setName("Async chunk loader thread for world: " + world.getWorldData().getName()); + this.workers[i].setPriority(Thread.NORM_PRIORITY - 1); + this.workers[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { + ConcreteFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable); + }); + this.workers[i].start(); + } + + } + + public ChunkLoadTask scheduleChunkLoad(final int chunkX, final int chunkZ, final int priority, + final Consumer onComplete, + final boolean intendingToBlock) { + AsyncCatcher.catchOp("Async chunk load schedule"); + final WorldServer world = this.world; + + return this.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> { + if (valueInMap != null) { + throw new IllegalStateException("Double scheduling chunk load"); + } + + final ChunkLoadTask ret = new ChunkLoadTask(world, chunkX, chunkZ, priority, onComplete, ChunkLoadTaskManager.this); + + ConcreteFileIOThread.Holder.INSTANCE.loadChunkDataAsync(world, chunkX, chunkZ, priority, (final ConcreteFileIOThread.ChunkData chunkData) -> { + ret.chunkData = chunkData; + ChunkLoadTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here + }, true, true, intendingToBlock); + + return ret; + }); + } + + public void flush() { + ConcreteFileIOThread.Holder.INSTANCE.flush(); + if (this.workers == null) { + return; + } + for (final QueueExecutorThread worker : this.workers) { + worker.flush(); + } + } + + public void shutdown(final boolean wait) { + if (wait) { + ConcreteFileIOThread.Holder.INSTANCE.flush(); + } + + if (this.workers == null) { + return; + } + + for (final QueueExecutorThread worker : this.workers) { + worker.close(false); + } + + if (wait) { + this.flush(); + } + } + + public void raisePriority(final int chunkX, final int chunkZ, final int priority) { + ChunkLoadTask task = this.tasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ))); + if (task != null) { + task.raisePriority(priority); + this.internalScheduleNotify(); + } + } + + protected void internalSchedule(final ChunkLoadTask task) { + if (this.workers == null) { + ((IAsyncTaskHandler)this.world.getChunkProvider().serverThreadQueue).addTask(task); + return; + } + + // It's important we order the task to be executed before notifying. + this.queue.add(task); + if (task.isScheduled()) { + this.internalScheduleNotify(); + } + } + + protected void internalScheduleNotify() { + for (final QueueExecutorThread worker : this.workers) { + if (worker.notifyTasks()) { + break; + } + } + } + +} diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java index db9113994e..4f7c442264 100644 --- a/src/main/java/net/minecraft/server/ChunkProviderServer.java +++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java @@ -147,11 +147,143 @@ public class ChunkProviderServer extends IChunkProvider { return playerChunk.getAvailableChunkNow(); } + + private long asyncLoadSeqCounter; + + public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer onComplete) { + if (Thread.currentThread() != this.serverThread) { + this.serverThreadQueue.execute(() -> { + this.getChunkAtAsynchronously(x, z, gen, onComplete); + }); + return; + } + + long k = ChunkCoordIntPair.pair(x, z); + ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(x, z); + + IChunkAccess ichunkaccess; + + // try cache + for (int l = 0; l < 4; ++l) { + if (k == this.cachePos[l] && ChunkStatus.FULL == this.cacheStatus[l]) { + ichunkaccess = this.cacheChunk[l]; + if (ichunkaccess != null) { // CraftBukkit - the chunk can become accessible in the meantime TODO for non-null chunks it might also make sense to check that the chunk's state hasn't changed in the meantime + + // move to first in cache + + for (int i1 = 3; i1 > 0; --i1) { + this.cachePos[i1] = this.cachePos[i1 - 1]; + this.cacheStatus[i1] = this.cacheStatus[i1 - 1]; + this.cacheChunk[i1] = this.cacheChunk[i1 - 1]; + } + + this.cachePos[0] = k; + this.cacheStatus[0] = ChunkStatus.FULL; + this.cacheChunk[0] = ichunkaccess; + + onComplete.accept((Chunk)ichunkaccess); + + return; + } + } + } + + if (gen) { + this.bringToFullStatusAsync(x, z, chunkPos, onComplete); + return; + } + + IChunkAccess current = this.getChunkAtImmediately(x, z); // we want to bypass ticket restrictions + if (current != null) { + if (!(current instanceof ProtoChunkExtension) && !(current instanceof net.minecraft.server.Chunk)) { + onComplete.accept(null); // the chunk is not gen'd + return; + } + // we know the chunk is at full status here (either in read-only mode or the real thing) + this.bringToFullStatusAsync(x, z, chunkPos, onComplete); + return; + } else { + // Paper start - async io + ChunkStatus status = world.getChunkProvider().playerChunkMap.getStatusOnDiskNoLoad(x, z); // Paper - async io - move to own method + + if (status == ChunkStatus.EMPTY) { + // does not exist on disk + onComplete.accept(null); + return; + } + + if (status == ChunkStatus.FULL) { + this.bringToFullStatusAsync(x, z, chunkPos, onComplete); + return; + } else if (status != null) { + onComplete.accept(null); + return; // not full status on disk + } + // status is null here + // Paper end + + // at this stage we don't know what status the chunk is in + } + + // here we don't know what status it is and we're not supposed to generate + // so we asynchronously load empty status + + this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.EMPTY, (IChunkAccess chunk) -> { + if (!(chunk instanceof ProtoChunkExtension) && !(chunk instanceof net.minecraft.server.Chunk)) { + // the chunk on disk was not a full status chunk + onComplete.accept(null); + return; + } + this.bringToFullStatusAsync(x, z, chunkPos, onComplete); // bring to full status if required + }); + } + + private void bringToFullStatusAsync(int x, int z, ChunkCoordIntPair chunkPos, java.util.function.Consumer onComplete) { + this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.FULL, (java.util.function.Consumer)onComplete); + } + + private void bringToStatusAsync(int x, int z, ChunkCoordIntPair chunkPos, ChunkStatus status, java.util.function.Consumer onComplete) { + CompletableFuture> future = this.getChunkFutureMainThread(x, z, status, true); + long identifier = this.asyncLoadSeqCounter++; + int ticketLevel = MCUtil.getTicketLevelFor(status); + this.addTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier); + + future.whenCompleteAsync((Either either, Throwable throwable) -> { + // either left -> success + // either right -> failure + + if (throwable != null) { + throw new RuntimeException(throwable); + } + + this.removeTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier); + this.addTicketAtLevel(TicketType.UNKNOWN, chunkPos, ticketLevel, chunkPos); // allow unloading + + Optional failure = either.right(); + + if (failure.isPresent()) { + // failure + throw new IllegalStateException("Chunk failed to load: " + failure.get().toString()); + } + + onComplete.accept(either.left().get()); + + }, this.serverThreadQueue); + } + + public void addTicketAtLevel(TicketType ticketType, ChunkCoordIntPair chunkPos, int ticketLevel, T identifier) { + this.chunkMapDistance.addTicketAtLevel(ticketType, chunkPos, ticketLevel, identifier); + } + + public void removeTicketAtLevel(TicketType ticketType, ChunkCoordIntPair chunkPos, int ticketLevel, T identifier) { + this.chunkMapDistance.removeTicketAtLevel(ticketType, chunkPos, ticketLevel, identifier); + } // Paper end @Nullable @Override public IChunkAccess getChunkAt(int i, int j, ChunkStatus chunkstatus, boolean flag) { + final int x = i; final int z = j; // Paper - conflict on variable change if (Thread.currentThread() != this.serverThread) { return (IChunkAccess) CompletableFuture.supplyAsync(() -> { return this.getChunkAt(i, j, chunkstatus, flag); @@ -173,6 +305,9 @@ public class ChunkProviderServer extends IChunkProvider { CompletableFuture> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag); if (!completablefuture.isDone()) { // Paper + // Paper start - async chunk io // Paper start - async chunk loading + this.world.asyncLoadManager.raisePriority(x, z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY); + // Paper end this.world.timings.chunkAwait.startTiming(); // Paper this.serverThreadQueue.awaitTasks(completablefuture::isDone); this.world.timings.chunkAwait.stopTiming(); // Paper diff --git a/src/main/java/net/minecraft/server/ChunkRegionLoader.java b/src/main/java/net/minecraft/server/ChunkRegionLoader.java index f88e3d957f..bee1330763 100644 --- a/src/main/java/net/minecraft/server/ChunkRegionLoader.java +++ b/src/main/java/net/minecraft/server/ChunkRegionLoader.java @@ -6,6 +6,7 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import it.unimi.dsi.fastutil.longs.LongSet; import it.unimi.dsi.fastutil.shorts.ShortList; import it.unimi.dsi.fastutil.shorts.ShortListIterator; +import java.util.ArrayDeque; // Paper import java.util.Arrays; import java.util.BitSet; import java.util.EnumSet; @@ -22,7 +23,29 @@ public class ChunkRegionLoader { private static final Logger LOGGER = LogManager.getLogger(); + // Paper start + public static final class InProgressChunkHolder { + + public final ProtoChunk protoChunk; + public final ArrayDeque tasks; + + public NBTTagCompound poiData; + + public InProgressChunkHolder(final ProtoChunk protoChunk, final ArrayDeque tasks) { + this.protoChunk = protoChunk; + this.tasks = tasks; + } + } + public static ProtoChunk loadChunk(WorldServer worldserver, DefinedStructureManager definedstructuremanager, VillagePlace villageplace, ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) { + InProgressChunkHolder holder = loadChunk(worldserver, definedstructuremanager, villageplace, chunkcoordintpair, nbttagcompound, true); + holder.tasks.forEach(Runnable::run); + return holder.protoChunk; + } + + public static InProgressChunkHolder loadChunk(WorldServer worldserver, DefinedStructureManager definedstructuremanager, VillagePlace villageplace, ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound, boolean distinguish) { + ArrayDeque tasksToExecuteOnMain = new ArrayDeque<>(); + // Paper end ChunkGenerator chunkgenerator = worldserver.getChunkProvider().getChunkGenerator(); WorldChunkManager worldchunkmanager = chunkgenerator.getWorldChunkManager(); NBTTagCompound nbttagcompound1 = nbttagcompound.getCompound("Level"); @@ -66,7 +89,9 @@ public class ChunkRegionLoader { LightEngine lightengine = chunkproviderserver.getLightEngine(); if (flag) { - lightengine.b(chunkcoordintpair, true); + tasksToExecuteOnMain.add(() -> { // Paper - delay this task since we're executing off-main + lightengine.b(chunkcoordintpair, true); + }); // Paper - delay this task since we're executing off-main } for (int k = 0; k < nbttaglist.size(); ++k) { @@ -82,16 +107,30 @@ public class ChunkRegionLoader { achunksection[b0] = chunksection; } - villageplace.a(chunkcoordintpair, chunksection); + tasksToExecuteOnMain.add(() -> { // Paper - delay this task since we're executing off-main + villageplace.a(chunkcoordintpair, chunksection); + }); // Paper - delay this task since we're executing off-main } if (flag) { if (nbttagcompound2.hasKeyOfType("BlockLight", 7)) { - lightengine.a(EnumSkyBlock.BLOCK, SectionPosition.a(chunkcoordintpair, b0), new NibbleArray(nbttagcompound2.getByteArray("BlockLight"))); + // Paper start - delay this task since we're executing off-main + NibbleArray blockLight = new NibbleArray(nbttagcompound2.getByteArray("BlockLight")); + // Note: We move the block light nibble array creation here for perf & in case the compound is modified + tasksToExecuteOnMain.add(() -> { + lightengine.a(EnumSkyBlock.BLOCK, SectionPosition.a(chunkcoordintpair, b0), blockLight); + }); + // Paper end } if (flag2 && nbttagcompound2.hasKeyOfType("SkyLight", 7)) { - lightengine.a(EnumSkyBlock.SKY, SectionPosition.a(chunkcoordintpair, b0), new NibbleArray(nbttagcompound2.getByteArray("SkyLight"))); + // Paper start - delay this task since we're executing off-main + NibbleArray skyLight = new NibbleArray(nbttagcompound2.getByteArray("SkyLight")); + // Note: We move the block light nibble array creation here for perf & in case the compound is modified + tasksToExecuteOnMain.add(() -> { + lightengine.a(EnumSkyBlock.SKY, SectionPosition.a(chunkcoordintpair, b0), skyLight); + }); + // Paper end } } } @@ -194,7 +233,7 @@ public class ChunkRegionLoader { } if (chunkstatus_type == ChunkStatus.Type.LEVELCHUNK) { - return new ProtoChunkExtension((Chunk) object); + return new InProgressChunkHolder(new ProtoChunkExtension((Chunk) object), tasksToExecuteOnMain); // Paper - Async chunk loading } else { ProtoChunk protochunk1 = (ProtoChunk) object; @@ -233,7 +272,7 @@ public class ChunkRegionLoader { protochunk1.a(worldgenstage_features, BitSet.valueOf(nbttagcompound5.getByteArray(s1))); } - return protochunk1; + return new InProgressChunkHolder(protochunk1, tasksToExecuteOnMain); // Paper - Async chunk loading } } diff --git a/src/main/java/net/minecraft/server/ChunkStatus.java b/src/main/java/net/minecraft/server/ChunkStatus.java index e324989b46..abb0d69d2f 100644 --- a/src/main/java/net/minecraft/server/ChunkStatus.java +++ b/src/main/java/net/minecraft/server/ChunkStatus.java @@ -153,6 +153,7 @@ public class ChunkStatus { return ChunkStatus.q.size(); } + public static int getTicketLevelOffset(ChunkStatus status) { return ChunkStatus.a(status); } // Paper - OBFHELPER public static int a(ChunkStatus chunkstatus) { return ChunkStatus.r.getInt(chunkstatus.c()); } diff --git a/src/main/java/net/minecraft/server/IChunkLoader.java b/src/main/java/net/minecraft/server/IChunkLoader.java index 3f14392e6e..cc933ec067 100644 --- a/src/main/java/net/minecraft/server/IChunkLoader.java +++ b/src/main/java/net/minecraft/server/IChunkLoader.java @@ -3,6 +3,10 @@ package net.minecraft.server; import com.mojang.datafixers.DataFixer; import java.io.File; import java.io.IOException; +// Paper start +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +// Paper end import java.util.function.Supplier; import javax.annotation.Nullable; @@ -10,7 +14,9 @@ public class IChunkLoader extends RegionFileCache { protected final DataFixer b; @Nullable - private PersistentStructureLegacy a; + private volatile PersistentStructureLegacy a; // Paper - async chunk loading + + private final Object persistentDataLock = new Object(); // Paper public IChunkLoader(File file, DataFixer datafixer) { super(file); @@ -55,9 +61,26 @@ public class IChunkLoader extends RegionFileCache { NBTTagCompound level = nbttagcompound.getCompound("Level"); if (level.getBoolean("TerrainPopulated") && !level.getBoolean("LightPopulated")) { ChunkProviderServer cps = (generatoraccess == null) ? null : ((WorldServer) generatoraccess).getChunkProvider(); + // Paper start - Async chunk loading + CompletableFuture future = new CompletableFuture<>(); + MCUtil.ensureMain((Runnable)() -> { + try { + // Paper end if (check(cps, pos.x - 1, pos.z) && check(cps, pos.x - 1, pos.z - 1) && check(cps, pos.x, pos.z - 1)) { level.setBoolean("LightPopulated", true); } + // Paper start - Async chunk loading + future.complete(null); + } catch (IOException ex) { + future.completeExceptionally(ex); + } + }); + try { + future.join(); + } catch (CompletionException ex) { + com.destroystokyo.paper.util.SneakyThrow.sneaky(ex.getCause()); + } + // Paper end } } // CraftBukkit end @@ -65,11 +88,13 @@ public class IChunkLoader extends RegionFileCache { if (i < 1493) { nbttagcompound = GameProfileSerializer.a(this.b, DataFixTypes.CHUNK, nbttagcompound, i, 1493); if (nbttagcompound.getCompound("Level").getBoolean("hasLegacyStructureData")) { + synchronized (this.persistentDataLock) { // Paper - Async chunk loading if (this.a == null) { this.a = PersistentStructureLegacy.a(dimensionmanager.getType(), (WorldPersistentData) supplier.get()); // CraftBukkit - getType } nbttagcompound = this.a.a(nbttagcompound); + } // Paper - Async chunk loading } } @@ -89,7 +114,9 @@ public class IChunkLoader extends RegionFileCache { public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException { super.write(chunkcoordintpair, nbttagcompound); if (this.a != null) { + synchronized (this.persistentDataLock) { // Paper - Async chunk loading this.a.a(chunkcoordintpair.pair()); + } // Paper - Async chunk loading } } diff --git a/src/main/java/net/minecraft/server/MCUtil.java b/src/main/java/net/minecraft/server/MCUtil.java index 23d1935dd5..14f8b61042 100644 --- a/src/main/java/net/minecraft/server/MCUtil.java +++ b/src/main/java/net/minecraft/server/MCUtil.java @@ -530,4 +530,9 @@ public final class MCUtil { out.print(fileData); } } + + public static int getTicketLevelFor(ChunkStatus status) { + // TODO make sure the constant `33` is correct on future updates. See getChunkAt(int, int, ChunkStatus, boolean) + return 33 + ChunkStatus.getTicketLevelOffset(status); + } } diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java index 0324a90ca5..430cd70cf5 100644 --- a/src/main/java/net/minecraft/server/MinecraftServer.java +++ b/src/main/java/net/minecraft/server/MinecraftServer.java @@ -764,6 +764,7 @@ public abstract class MinecraftServer extends IAsyncTaskHandlerReentrant executor; public final ChunkGenerator chunkGenerator; - private final Supplier m; + private final Supplier m; public final Supplier getWorldPersistentDataSupplier() { return this.m; } // Paper - OBFHELPER private final VillagePlace n; public final LongSet unloadQueue; private boolean updatingChunksModified; @@ -67,7 +67,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { public final WorldLoadListener worldLoadListener; public final PlayerChunkMap.a u; public final PlayerChunkMap.a getChunkMapDistanceManager() { return this.u; } // Paper - OBFHELPER // CraftBukkit - private -> public // PAIL chunkDistanceManager private final AtomicInteger v; - private final DefinedStructureManager definedStructureManager; + public final DefinedStructureManager definedStructureManager; // Paper - private -> public private final File x; private final PlayerMap playerMap; public final Int2ObjectMap trackedEntities; @@ -101,7 +101,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { this.lightEngine = new LightEngineThreaded(ilightaccess, this, this.world.getWorldProvider().g(), threadedmailbox2, this.q.a(threadedmailbox2, false)); this.u = new PlayerChunkMap.a(executor, iasynctaskhandler); this.m = supplier; - this.n = new VillagePlace(new File(this.x, "poi"), datafixer); + this.n = new VillagePlace(new File(this.x, "poi"), datafixer, this.world); // Paper this.setViewDistance(i); } @@ -261,6 +261,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { @Override public void close() throws IOException { this.q.close(); + this.world.asyncLoadManager.shutdown(true); // Paper - Required since we're closing regionfiles in the next line this.n.close(); super.close(); } @@ -308,7 +309,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { shouldSave = ((Chunk) ichunkaccess).lastSaved + world.paperConfig.autoSavePeriod <= world.getTime(); } - if (shouldSave && this.saveChunk(ichunkaccess)) { + if (shouldSave && this.saveChunk(ichunkaccess, true)) { // Paper - async chunk io ++savedThisTick; playerchunk.m(); } @@ -328,11 +329,15 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { protected void unloadChunks(BooleanSupplier booleansupplier) { GameProfilerFiller gameprofilerfiller = this.world.getMethodProfiler(); + try (Timing ignored = this.world.timings.poiUnload.startTiming()) { // Paper gameprofilerfiller.enter("poi"); this.n.a(booleansupplier); + } gameprofilerfiller.exitEnter("chunk_unload"); if (!this.world.isSavingDisabled()) { + try (Timing ignored = this.world.timings.chunkUnload.startTiming()) { // Paper this.b(booleansupplier); + } } gameprofilerfiller.exit(); @@ -386,7 +391,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { ((Chunk) ichunkaccess).setLoaded(false); } - this.saveChunk(ichunkaccess); + this.saveChunk(ichunkaccess, true); // Paper - async chunk io if (this.loadedChunks.remove(i) && ichunkaccess instanceof Chunk) { Chunk chunk = (Chunk) ichunkaccess; @@ -462,26 +467,30 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { } } + // Paper start - Async chunk io + public NBTTagCompound completeChunkData(NBTTagCompound compound, ChunkCoordIntPair chunkcoordintpair) throws IOException { + return compound == null ? null : this.getChunkData(this.world.getWorldProvider().getDimensionManager(), this.getWorldPersistentDataSupplier(), compound, chunkcoordintpair, this.world); + } + // Paper end + private CompletableFuture> f(ChunkCoordIntPair chunkcoordintpair) { - return CompletableFuture.supplyAsync(() -> { + // Paper start - Async chunk io + final java.util.function.BiFunction> callable = (chunkHolder, ioThrowable) -> { try (Timing ignored = this.world.timings.syncChunkLoadTimer.startTimingIfSync()) { // Paper - NBTTagCompound nbttagcompound; // Paper - try (Timing ignored2 = this.world.timings.chunkIOStage1.startTimingIfSync()) { // Paper - nbttagcompound = this.readChunkData(chunkcoordintpair); + if (ioThrowable != null) { + com.destroystokyo.paper.io.IOUtil.rethrow(ioThrowable); } + this.getVillagePlace().loadInData(chunkcoordintpair, chunkHolder.poiData); + chunkHolder.tasks.forEach(Runnable::run); + // Paper - async load completes this + // Paper end - if (nbttagcompound != null) { - boolean flag = nbttagcompound.hasKeyOfType("Level", 10) && nbttagcompound.getCompound("Level").hasKeyOfType("Status", 8); - - if (flag) { - ProtoChunk protochunk = ChunkRegionLoader.loadChunk(this.world, this.definedStructureManager, this.n, chunkcoordintpair, nbttagcompound); - - protochunk.setLastSaved(this.world.getTime()); - return Either.left(protochunk); - } - - PlayerChunkMap.LOGGER.error("Chunk file at {} is missing level data, skipping", chunkcoordintpair); + // Paper start - This is done async + if (chunkHolder.protoChunk != null) { + chunkHolder.protoChunk.setLastSaved(this.world.getTime()); + return Either.left(chunkHolder.protoChunk); } + // Paper end } catch (ReportedException reportedexception) { Throwable throwable = reportedexception.getCause(); @@ -495,7 +504,17 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { } return Either.left(new ProtoChunk(chunkcoordintpair, ChunkConverter.a, this.world)); // Paper - Anti-Xray - }, this.executor); + // Paper start - Async chunk io + }; + CompletableFuture> ret = new CompletableFuture<>(); + this.world.asyncLoadManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, + com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, (ChunkRegionLoader.InProgressChunkHolder holder) -> { + PlayerChunkMap.this.executor.addTask(() -> { + ret.complete(callable.apply(holder, null)); + }); + }, false); + return ret; + // Paper end } private CompletableFuture> b(PlayerChunk playerchunk, ChunkStatus chunkstatus) { @@ -701,18 +720,42 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { return this.v.get(); } + // Paper start - async chunk io + private boolean writeDataAsync(ChunkCoordIntPair chunkPos, NBTTagCompound poiData, NBTTagCompound chunkData, boolean async) { + com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(this.world, chunkPos.x, chunkPos.z, + poiData, chunkData, !async ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.LOW_PRIORITY); + + if (async) { + return true; + } + try (co.aikar.timings.Timing ignored = this.world.timings.chunkSaveIOWait.startTiming()) { // Paper + Boolean successPoi = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, chunkPos.x, chunkPos.z, true, true); + Boolean successChunk = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, chunkPos.x, chunkPos.z, true, false); + + if (successPoi == Boolean.FALSE || successChunk == Boolean.FALSE) { + return false; + } + + // null indicates no task existed, which means our write completed before we waited on it + + return true; + } // Paper + } + // Paper end + public boolean saveChunk(IChunkAccess ichunkaccess) { - this.n.a(ichunkaccess.getPos()); + // Paper start - async param + return this.saveChunk(ichunkaccess, false); + } + public boolean saveChunk(IChunkAccess ichunkaccess, boolean async) { + try (co.aikar.timings.Timing ignored = this.world.timings.chunkSave.startTiming()) { + NBTTagCompound poiData = this.getVillagePlace().getData(ichunkaccess.getPos()); // Paper + //this.n.a(ichunkaccess.getPos()); // Delay + // Paper end if (!ichunkaccess.isNeedsSaving()) { return false; } else { - try { - this.world.checkSession(); - } catch (ExceptionWorldConflict exceptionworldconflict) { - PlayerChunkMap.LOGGER.error("Couldn't save chunk; already in use by another instance of Minecraft?", exceptionworldconflict); - com.destroystokyo.paper.exception.ServerInternalException.reportInternalException(exceptionworldconflict); // Paper - return false; - } + // Paper - The save session check is performed on the IO thread ichunkaccess.setLastSaved(this.world.getTime()); ichunkaccess.setNeedsSaving(false); @@ -723,27 +766,33 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { NBTTagCompound nbttagcompound; if (chunkstatus.getType() != ChunkStatus.Type.LEVELCHUNK) { + try (co.aikar.timings.Timing ignored1 = this.world.timings.chunkSaveOverwriteCheck.startTiming()) { // Paper // Paper start - Optimize save by using status cache ChunkStatus statusOnDisk = this.getChunkStatusOnDisk(chunkcoordintpair); if (statusOnDisk != null && statusOnDisk.getType() == ChunkStatus.Type.LEVELCHUNK) { // Paper end + this.writeDataAsync(ichunkaccess.getPos(), poiData, null, async); // Paper - Async chunk io return false; } if (chunkstatus == ChunkStatus.EMPTY && ichunkaccess.h().values().stream().noneMatch(StructureStart::e)) { + this.writeDataAsync(ichunkaccess.getPos(), poiData, null, async); // Paper - Async chunk io return false; } } - + } // Paper + try (co.aikar.timings.Timing ignored1 = this.world.timings.chunkSaveDataSerialization.startTiming()) { // Paper nbttagcompound = ChunkRegionLoader.saveChunk(this.world, ichunkaccess); - this.write(chunkcoordintpair, nbttagcompound); - return true; + } // Paper + return this.writeDataAsync(ichunkaccess.getPos(), poiData, nbttagcompound, async); // Paper - Async chunk io + //return true; // Paper } catch (Exception exception) { PlayerChunkMap.LOGGER.error("Failed to save chunk {},{}", chunkcoordintpair.x, chunkcoordintpair.z, exception); com.destroystokyo.paper.exception.ServerInternalException.reportInternalException(exception); // Paper return false; } } + } // Paper } protected void setViewDistance(int i) { @@ -808,6 +857,42 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { return Iterables.unmodifiableIterable(this.visibleChunks.values()); } + // Paper start - Asynchronous chunk io + @Nullable + @Override + public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws IOException { + if (Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { + NBTTagCompound ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE + .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(), + false, true, true).join().chunkData; + + if (ret == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) { + throw new IOException("See logs for further detail"); + } + return ret; + } + return super.read(chunkcoordintpair); + } + + @Override + public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException { + if (Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { + com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave( + this.world, chunkcoordintpair.x, chunkcoordintpair.z, null, nbttagcompound, + com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread()); + + Boolean ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, + chunkcoordintpair.x, chunkcoordintpair.z, true, false); + + if (ret == Boolean.FALSE) { + throw new IOException("See logs for further detail"); + } + return; + } + super.write(chunkcoordintpair, nbttagcompound); + } + // Paper end + @Nullable public NBTTagCompound readChunkData(ChunkCoordIntPair chunkcoordintpair) throws IOException { // Paper - private -> public NBTTagCompound nbttagcompound = this.read(chunkcoordintpair); @@ -830,12 +915,30 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { // Paper start - chunk status cache "api" public ChunkStatus getChunkStatusOnDiskIfCached(ChunkCoordIntPair chunkPos) { + // Paper start - async io + NBTTagCompound inProgressWrite = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE + .getPendingWrite(this.world, chunkPos.x, chunkPos.z, false); + + if (inProgressWrite != null) { + return ChunkRegionLoader.getStatus(inProgressWrite); + } + // Paper end + RegionFile regionFile = this.getRegionFileIfLoaded(chunkPos); return regionFile == null ? null : regionFile.getStatusIfCached(chunkPos.x, chunkPos.z); } public ChunkStatus getChunkStatusOnDisk(ChunkCoordIntPair chunkPos) throws IOException { + // Paper start - async io + NBTTagCompound inProgressWrite = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE + .getPendingWrite(this.world, chunkPos.x, chunkPos.z, false); + + if (inProgressWrite != null) { + return ChunkRegionLoader.getStatus(inProgressWrite); + } + // Paper end + synchronized (this) { // Paper - async io RegionFile regionFile = this.getRegionFile(chunkPos, false); if (!regionFile.chunkExists(chunkPos)) { @@ -847,17 +950,49 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { if (status != null) { return status; } + // Paper start - async io + } - this.readChunkData(chunkPos); + NBTTagCompound compound = this.readChunkData(chunkPos); - return regionFile.getStatusIfCached(chunkPos.x, chunkPos.z); + return ChunkRegionLoader.getStatus(compound); + // Paper end } public void updateChunkStatusOnDisk(ChunkCoordIntPair chunkPos, @Nullable NBTTagCompound compound) throws IOException { + synchronized (this) { // Paper - async io RegionFile regionFile = this.getRegionFile(chunkPos, false); regionFile.setStatus(chunkPos.x, chunkPos.z, ChunkRegionLoader.getStatus(compound)); + } // Paper - async io } + + // Paper start - async io + // this function will not load chunk data off disk to check for status + // ret null for unknown, empty for empty status on disk or absent from disk + public ChunkStatus getStatusOnDiskNoLoad(int x, int z) { + // Paper start - async io + net.minecraft.server.NBTTagCompound inProgressWrite = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE + .getPendingWrite(this.world, x, z, false); + + if (inProgressWrite != null) { + return net.minecraft.server.ChunkRegionLoader.getStatus(inProgressWrite); + } + // Paper end + // variant of PlayerChunkMap#getChunkStatusOnDisk that does not load data off disk, but loads the region file + ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(x, z); + synchronized (world.getChunkProvider().playerChunkMap) { + net.minecraft.server.RegionFile file; + try { + file = world.getChunkProvider().playerChunkMap.getRegionFile(chunkPos, false); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + return !file.chunkExists(chunkPos) ? ChunkStatus.EMPTY : file.getStatusIfCached(x, z); + } + } + // Paper end // Paper end // Spigot Start @@ -1197,6 +1332,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { } + public VillagePlace getVillagePlace() { return this.h(); } // Paper - OBFHELPER protected VillagePlace h() { return this.n; } diff --git a/src/main/java/net/minecraft/server/RegionFile.java b/src/main/java/net/minecraft/server/RegionFile.java index 66c8b0307f..2ee4b88f09 100644 --- a/src/main/java/net/minecraft/server/RegionFile.java +++ b/src/main/java/net/minecraft/server/RegionFile.java @@ -337,7 +337,7 @@ public class RegionFile implements AutoCloseable { this.writeInt(i); // Paper - Avoid 3 io write calls } - public void close() throws IOException { + public synchronized void close() throws IOException { // Paper - synchronize this.closed = true; // Paper this.b.close(); } diff --git a/src/main/java/net/minecraft/server/RegionFileCache.java b/src/main/java/net/minecraft/server/RegionFileCache.java index e0fdf5f90f..d9283b36b6 100644 --- a/src/main/java/net/minecraft/server/RegionFileCache.java +++ b/src/main/java/net/minecraft/server/RegionFileCache.java @@ -51,13 +51,13 @@ public abstract class RegionFileCache implements AutoCloseable { } // Paper start - public RegionFile getRegionFileIfLoaded(ChunkCoordIntPair chunkcoordintpair) { + public synchronized RegionFile getRegionFileIfLoaded(ChunkCoordIntPair chunkcoordintpair) { // Paper - synchronize for async io return this.cache.getAndMoveToFirst(ChunkCoordIntPair.pair(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ())); } // Paper end public RegionFile getRegionFile(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { return this.a(chunkcoordintpair, existingOnly); } // Paper - OBFHELPER - private RegionFile a(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit + private synchronized RegionFile a(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit // Paper - synchronize for async io long i = ChunkCoordIntPair.pair(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ()); RegionFile regionfile = (RegionFile) this.cache.getAndMoveToFirst(i); @@ -344,7 +344,7 @@ public abstract class RegionFileCache implements AutoCloseable { } // CraftBukkit start - public boolean chunkExists(ChunkCoordIntPair pos) throws IOException { + public synchronized boolean chunkExists(ChunkCoordIntPair pos) throws IOException { // Paper - synchronize copyIfNeeded(pos.x, pos.z); // Paper RegionFile regionfile = a(pos, true); diff --git a/src/main/java/net/minecraft/server/RegionFileSection.java b/src/main/java/net/minecraft/server/RegionFileSection.java index a343a7b31d..7584174eb7 100644 --- a/src/main/java/net/minecraft/server/RegionFileSection.java +++ b/src/main/java/net/minecraft/server/RegionFileSection.java @@ -24,7 +24,7 @@ public class RegionFileSection extends RegionFi private static final Logger LOGGER = LogManager.getLogger(); private final Long2ObjectMap> b = new Long2ObjectOpenHashMap(); - private final LongLinkedOpenHashSet d = new LongLinkedOpenHashSet(); + protected final LongLinkedOpenHashSet d = new LongLinkedOpenHashSet(); // Paper private final BiFunction, R> e; private final Function f; private final DataFixer g; @@ -39,8 +39,8 @@ public class RegionFileSection extends RegionFi } protected void a(BooleanSupplier booleansupplier) { - while (!this.d.isEmpty() && booleansupplier.getAsBoolean()) { - ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(this.d.firstLong()).u(); + while (!this.d.isEmpty() && booleansupplier.getAsBoolean()) { // Paper - conflict here to avoid obfhelpers + ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(this.d.firstLong()).u(); // Paper - conflict here to avoid obfhelpers this.d(chunkcoordintpair); } @@ -82,9 +82,9 @@ public class RegionFileSection extends RegionFi Optional optional = this.d(i); if (optional.isPresent()) { - return (MinecraftSerializable) optional.get(); + return optional.get(); // Paper - decompile fix } else { - R r0 = (MinecraftSerializable) this.f.apply(() -> { + R r0 = this.f.apply(() -> { // Paper - decompile fix this.a(i); }); @@ -94,7 +94,12 @@ public class RegionFileSection extends RegionFi } private void b(ChunkCoordIntPair chunkcoordintpair) { - this.a(chunkcoordintpair, DynamicOpsNBT.a, this.c(chunkcoordintpair)); + // Paper start - load data in function + this.loadInData(chunkcoordintpair, this.c(chunkcoordintpair)); + } + public void loadInData(ChunkCoordIntPair chunkPos, NBTTagCompound compound) { + this.a(chunkPos, DynamicOpsNBT.a, compound); + // Paper end } @Nullable @@ -123,7 +128,7 @@ public class RegionFileSection extends RegionFi for (int l = 0; l < 16; ++l) { long i1 = SectionPosition.a(chunkcoordintpair, l).v(); Optional optional = optionaldynamic.get(Integer.toString(l)).get().map((dynamic2) -> { - return (MinecraftSerializable) this.e.apply(() -> { + return this.e.apply(() -> { // Paper - decompile fix this.a(i1); }, dynamic2); }); @@ -142,7 +147,7 @@ public class RegionFileSection extends RegionFi } private void d(ChunkCoordIntPair chunkcoordintpair) { - Dynamic dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); + Dynamic dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); // Paper - conflict here to avoid adding obfhelpers :) NBTBase nbtbase = (NBTBase) dynamic.getValue(); if (nbtbase instanceof NBTTagCompound) { @@ -157,6 +162,20 @@ public class RegionFileSection extends RegionFi } + // Paper start - internal get data function, copied from above + private NBTTagCompound getDataInternal(ChunkCoordIntPair chunkcoordintpair) { + Dynamic dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); + NBTBase nbtbase = (NBTBase) dynamic.getValue(); + + if (nbtbase instanceof NBTTagCompound) { + return (NBTTagCompound)nbtbase; + } else { + RegionFileSection.LOGGER.error("Expected compound tag, got {}", nbtbase); + } + return null; + } + // Paper end + private Dynamic a(ChunkCoordIntPair chunkcoordintpair, DynamicOps dynamicops) { Map map = Maps.newHashMap(); @@ -193,9 +212,9 @@ public class RegionFileSection extends RegionFi public void a(ChunkCoordIntPair chunkcoordintpair) { if (!this.d.isEmpty()) { for (int i = 0; i < 16; ++i) { - long j = SectionPosition.a(chunkcoordintpair, i).v(); + long j = SectionPosition.a(chunkcoordintpair, i).v(); // Paper - conflict here to avoid obfhelpers - if (this.d.contains(j)) { + if (this.d.contains(j)) { // Paper - conflict here to avoid obfhelpers this.d(chunkcoordintpair); return; } @@ -203,4 +222,21 @@ public class RegionFileSection extends RegionFi } } + + // Paper start - get data function + public NBTTagCompound getData(ChunkCoordIntPair chunkcoordintpair) { + // Note: Copied from above + // This is checking if the data exists, then it builds it later in getDataInternal(ChunkCoordIntPair) + if (!this.d.isEmpty()) { + for (int i = 0; i < 16; ++i) { + long j = SectionPosition.a(chunkcoordintpair, i).v(); + + if (this.d.contains(j)) { + return this.getDataInternal(chunkcoordintpair); + } + } + } + return null; + } + // Paper end } diff --git a/src/main/java/net/minecraft/server/TicketType.java b/src/main/java/net/minecraft/server/TicketType.java index 5acb0732c3..0ed2d2fbf9 100644 --- a/src/main/java/net/minecraft/server/TicketType.java +++ b/src/main/java/net/minecraft/server/TicketType.java @@ -22,6 +22,7 @@ public class TicketType { public static final TicketType PLUGIN = a("plugin", (a, b) -> 0); // CraftBukkit public static final TicketType PLUGIN_TICKET = a("plugin_ticket", (plugin1, plugin2) -> plugin1.getClass().getName().compareTo(plugin2.getClass().getName())); // Craftbukkit public static final TicketType ANTIXRAY = a("antixray", Integer::compareTo); // Paper - Anti-Xray + public static final TicketType ASYNC_LOAD = a("async_load", Long::compareTo); // Paper public static TicketType a(String s, Comparator comparator) { return new TicketType<>(s, comparator, 0L); diff --git a/src/main/java/net/minecraft/server/VillagePlace.java b/src/main/java/net/minecraft/server/VillagePlace.java index 7bc473e1ef..8e82326c47 100644 --- a/src/main/java/net/minecraft/server/VillagePlace.java +++ b/src/main/java/net/minecraft/server/VillagePlace.java @@ -20,8 +20,16 @@ public class VillagePlace extends RegionFileSection { private final VillagePlace.a a = new VillagePlace.a(); + private final WorldServer world; // Paper + public VillagePlace(File file, DataFixer datafixer) { + // Paper start + this(file, datafixer, null); + } + public VillagePlace(File file, DataFixer datafixer, WorldServer world) { + // Paper end super(file, VillagePlaceSection::new, VillagePlaceSection::new, datafixer, DataFixTypes.POI_CHUNK); + this.world = world; // Paper } public void a(BlockPosition blockposition, VillagePlaceType villageplacetype) { @@ -128,7 +136,23 @@ public class VillagePlace extends RegionFileSection { @Override public void a(BooleanSupplier booleansupplier) { - super.a(booleansupplier); + // Paper start - async chunk io + if (this.world == null) { + super.a(booleansupplier); + } else { + //super.a(booleansupplier); // re-implement below + while (!((RegionFileSection)this).d.isEmpty() && booleansupplier.getAsBoolean()) { + ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(((RegionFileSection)this).d.firstLong()).u(); + + NBTTagCompound data; + try (co.aikar.timings.Timing ignored1 = this.world.timings.poiSaveDataSerialization.startTiming()) { + data = this.getData(chunkcoordintpair); + } + com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(this.world, + chunkcoordintpair.x, chunkcoordintpair.z, data, null, com.destroystokyo.paper.io.PrioritizedTaskQueue.LOW_PRIORITY); + } + } + // Paper end this.a.a(); } @@ -164,7 +188,7 @@ public class VillagePlace extends RegionFileSection { } private static boolean a(ChunkSection chunksection) { - Stream stream = VillagePlaceType.e(); + Stream stream = VillagePlaceType.e(); // Paper - decompile fix chunksection.getClass(); return stream.anyMatch(chunksection::a); @@ -214,6 +238,42 @@ public class VillagePlace extends RegionFileSection { } } + // Paper start - Asynchronous chunk io + @javax.annotation.Nullable + @Override + public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws java.io.IOException { + if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { + NBTTagCompound ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE + .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(), + true, false, true).join().poiData; + + if (ret == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) { + throw new java.io.IOException("See logs for further detail"); + } + return ret; + } + return super.read(chunkcoordintpair); + } + + @Override + public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws java.io.IOException { + if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { + com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave( + this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null, + com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread()); + + Boolean ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, + chunkcoordintpair.x, chunkcoordintpair.z, true, true); + + if (ret == Boolean.FALSE) { + throw new java.io.IOException("See logs for further detail"); + } + return; + } + super.write(chunkcoordintpair, nbttagcompound); + } + // Paper end + public static enum Occupancy { HAS_SPACE(VillagePlaceRecord::d), IS_OCCUPIED(VillagePlaceRecord::e), ANY((villageplacerecord) -> { @@ -222,7 +282,7 @@ public class VillagePlace extends RegionFileSection { private final Predicate d; - private Occupancy(Predicate predicate) { + private Occupancy(Predicate predicate) { // Paper - decompile fix this.d = predicate; } diff --git a/src/main/java/net/minecraft/server/WorldServer.java b/src/main/java/net/minecraft/server/WorldServer.java index 47005dcfdc..f7597d499f 100644 --- a/src/main/java/net/minecraft/server/WorldServer.java +++ b/src/main/java/net/minecraft/server/WorldServer.java @@ -75,6 +75,79 @@ public class WorldServer extends World { return new Throwable(entity + " Added to world at " + new java.util.Date()); } + // Paper start - Asynchronous IO + public final com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController poiDataController = new com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController() { + @Override + public void writeData(int x, int z, NBTTagCompound compound) throws java.io.IOException { + WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().write(new ChunkCoordIntPair(x, z), compound); + } + + @Override + public NBTTagCompound readData(int x, int z) throws java.io.IOException { + return WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().read(new ChunkCoordIntPair(x, z)); + } + + @Override + public T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function function) { + synchronized (WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace()) { + RegionFile file; + + try { + file = WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().getRegionFile(new ChunkCoordIntPair(chunkX, chunkZ), false); + } catch (java.io.IOException ex) { + throw new RuntimeException(ex); + } + + return function.apply(file); + } + } + + @Override + public T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function function) { + synchronized (WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace()) { + RegionFile file = WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().getRegionFileIfLoaded(new ChunkCoordIntPair(chunkX, chunkZ)); + return function.apply(file); + } + } + }; + + public final com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController chunkDataController = new com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController() { + @Override + public void writeData(int x, int z, NBTTagCompound compound) throws java.io.IOException { + WorldServer.this.getChunkProvider().playerChunkMap.write(new ChunkCoordIntPair(x, z), compound); + } + + @Override + public NBTTagCompound readData(int x, int z) throws java.io.IOException { + return WorldServer.this.getChunkProvider().playerChunkMap.read(new ChunkCoordIntPair(x, z)); + } + + @Override + public T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function function) { + synchronized (WorldServer.this.getChunkProvider().playerChunkMap) { + RegionFile file; + + try { + file = WorldServer.this.getChunkProvider().playerChunkMap.getRegionFile(new ChunkCoordIntPair(chunkX, chunkZ), false); + } catch (java.io.IOException ex) { + throw new RuntimeException(ex); + } + + return function.apply(file); + } + } + + @Override + public T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function function) { + synchronized (WorldServer.this.getChunkProvider().playerChunkMap) { + RegionFile file = WorldServer.this.getChunkProvider().playerChunkMap.getRegionFileIfLoaded(new ChunkCoordIntPair(chunkX, chunkZ)); + return function.apply(file); + } + } + }; + public final com.destroystokyo.paper.io.chunk.ChunkLoadTaskManager asyncLoadManager; + // Paper end + // Add env and gen to constructor public WorldServer(MinecraftServer minecraftserver, Executor executor, WorldNBTStorage worldnbtstorage, WorldData worlddata, DimensionManager dimensionmanager, GameProfilerFiller gameprofilerfiller, WorldLoadListener worldloadlistener, org.bukkit.World.Environment env, org.bukkit.generator.ChunkGenerator gen) { super(worlddata, dimensionmanager, (world, worldprovider) -> { @@ -119,6 +192,7 @@ public class WorldServer extends World { this.mobSpawnerTrader = this.worldProvider.getDimensionManager().getType() == DimensionManager.OVERWORLD ? new MobSpawnerTrader(this) : null; // CraftBukkit - getType() this.getServer().addWorld(this.getWorld()); // CraftBukkit + this.asyncLoadManager = new com.destroystokyo.paper.io.chunk.ChunkLoadTaskManager(this, 4); // todo CONFIGURABLE // Paper } public void doTick(BooleanSupplier booleansupplier) { diff --git a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java index c5321c5076..d3d43f3b77 100644 --- a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java +++ b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java @@ -531,22 +531,23 @@ public class CraftWorld implements World { } if (!generate) { - net.minecraft.server.RegionFile file; - try { - file = world.getChunkProvider().playerChunkMap.getRegionFile(chunkPos, false); - } catch (IOException ex) { - throw new RuntimeException(ex); - } + ChunkStatus status = world.getChunkProvider().playerChunkMap.getStatusOnDiskNoLoad(x, z); // Paper - async io - move to own method - ChunkStatus status = file.getStatusIfCached(x, z); - if (!file.chunkExists(chunkPos) || (status != null && status != ChunkStatus.FULL)) { + // Paper start - async io + if (status == ChunkStatus.EMPTY) { + // does not exist on disk return false; } + if (status == null) { // at this stage we don't know what it is on disk IChunkAccess chunk = world.getChunkProvider().getChunkAt(x, z, ChunkStatus.EMPTY, true); if (!(chunk instanceof ProtoChunkExtension) && !(chunk instanceof net.minecraft.server.Chunk)) { return false; } + } else if (status != ChunkStatus.FULL) { + return false; // not full status on disk + } + // Paper end // fall through to load // we do this so we do not re-read the chunk data on disk @@ -2323,16 +2324,17 @@ public class CraftWorld implements World { @Override public CompletableFuture getChunkAtAsync(int x, int z, boolean gen) { - // TODO placeholder - if (Bukkit.isPrimaryThread()) { - return CompletableFuture.completedFuture(getChunkAtGen(x, z, gen)); - } else { - CompletableFuture ret = new CompletableFuture<>(); - net.minecraft.server.MinecraftServer.getServer().scheduleOnMain(() -> { - ret.complete(getChunkAtGen(x, z, gen)); - }); - return ret; + net.minecraft.server.Chunk immediate = this.world.getChunkProvider().getChunkAtIfLoadedImmediately(x, z); + if (immediate != null) { + return CompletableFuture.completedFuture(immediate.bukkitChunk); } + + CompletableFuture ret = new CompletableFuture<>(); + this.world.getChunkProvider().getChunkAtAsynchronously(x, z, gen, (net.minecraft.server.Chunk chunk) -> { + ret.complete(chunk == null ? null : chunk.bukkitChunk); + }); + + return ret; } // Paper end -- 2.20.1